You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2022/01/21 06:49:13 UTC

[servicecomb-kie] branch master updated: [fix] update etcdadpt and cari in go.mod (#241)

This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-kie.git


The following commit(s) were added to refs/heads/master by this push:
     new 640c5ac  [fix] update etcdadpt and cari in go.mod (#241)
640c5ac is described below

commit 640c5ac2365741ba113d2585ca7b5cb8fd1703e5
Author: robotljw <79...@qq.com>
AuthorDate: Fri Jan 21 14:49:05 2022 +0800

    [fix] update etcdadpt and cari in go.mod (#241)
---
 cmd/kieserver/main.go                              |   3 +
 go.mod                                             |   7 +-
 go.sum                                             |  15 +-
 server/datasource/etcd/init.go                     |  20 +-
 server/datasource/mongo/counter/revision.go        |   8 +-
 server/datasource/mongo/history/history_dao.go     |  11 +-
 server/datasource/mongo/init.go                    | 132 ++++++++-
 server/datasource/mongo/kv/kv_dao.go               |  47 ++--
 server/datasource/mongo/model/model.go             |  28 ++
 server/datasource/mongo/session/session.go         | 312 ---------------------
 server/datasource/mongo/session/session_test.go    |  34 ---
 server/datasource/mongo/session/struct.go          |  11 -
 .../datasource/mongo/track/polling_detail_dao.go   |  12 +-
 server/resource/v1/common.go                       |  11 +-
 test/init.go                                       |  23 +-
 15 files changed, 235 insertions(+), 439 deletions(-)

diff --git a/cmd/kieserver/main.go b/cmd/kieserver/main.go
index bc717a2..2572f08 100644
--- a/cmd/kieserver/main.go
+++ b/cmd/kieserver/main.go
@@ -29,6 +29,9 @@ import (
 	_ "github.com/go-chassis/go-chassis/v2/middleware/monitoring"
 	_ "github.com/go-chassis/go-chassis/v2/middleware/ratelimiter"
 
+	// cari db
+	_ "github.com/go-chassis/cari/db/bootstrap"
+
 	//storage
 	_ "github.com/apache/servicecomb-kie/server/datasource/etcd"
 	_ "github.com/apache/servicecomb-kie/server/datasource/mongo"
diff --git a/go.mod b/go.mod
index 9179e01..de2a2be 100644
--- a/go.mod
+++ b/go.mod
@@ -1,9 +1,9 @@
 module github.com/apache/servicecomb-kie
 
 require (
-	github.com/apache/servicecomb-service-center/eventbase v0.0.0-20211230015739-512a9cc7b4cd
+	github.com/apache/servicecomb-service-center/eventbase v0.0.0-20220120070230-26997eb876ca
 	github.com/emicklei/go-restful v2.12.0+incompatible
-	github.com/go-chassis/cari v0.5.1-0.20211229072151-7fa40d0919c6
+	github.com/go-chassis/cari v0.5.1-0.20220119150556-8ae374a2649d
 	github.com/go-chassis/foundation v0.4.0
 	github.com/go-chassis/go-archaius v1.5.2-0.20210301074935-e4694f6b077b
 	github.com/go-chassis/go-chassis/v2 v2.3.1-0.20211217084436-360a6a6a0ef3
@@ -11,11 +11,10 @@ require (
 	github.com/go-chassis/seclog v1.3.1-0.20210917082355-52c40864f240
 	github.com/gofrs/uuid v4.0.0+incompatible
 	github.com/hashicorp/serf v0.9.5
-	github.com/little-cui/etcdadpt v0.3.1
+	github.com/little-cui/etcdadpt v0.3.2
 	github.com/stretchr/testify v1.7.0
 	github.com/urfave/cli v1.22.4
 	go.mongodb.org/mongo-driver v1.4.6
-	gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
 	gopkg.in/yaml.v2 v2.4.0
 )
 
diff --git a/go.sum b/go.sum
index 7024637..ad81b04 100644
--- a/go.sum
+++ b/go.sum
@@ -40,8 +40,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
 github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
 github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
 github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
-github.com/apache/servicecomb-service-center/eventbase v0.0.0-20211230015739-512a9cc7b4cd h1:U1NiPGnnbZML/kOY52v6BCh1SlXrcsAnzZmj1IWAgTk=
-github.com/apache/servicecomb-service-center/eventbase v0.0.0-20211230015739-512a9cc7b4cd/go.mod h1:B/u1cq2IHLrbUTfC5jTM8rQFhJ7XZhmemqMWR0CGDHo=
+github.com/apache/servicecomb-service-center/eventbase v0.0.0-20220120070230-26997eb876ca h1:e4NoNmQYa7y0K+jJSbvQHsQ+mVI1ThBk/pP21MbFblQ=
+github.com/apache/servicecomb-service-center/eventbase v0.0.0-20220120070230-26997eb876ca/go.mod h1:eClC23tfQ0Q4vdesWnBeFfEyf58XyEQhoWki3+HSQ9s=
 github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e h1:QEF07wC0T1rKkctt1RINW/+RMTVmiwxETico2l3gxJA=
 github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
 github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I=
@@ -146,13 +146,11 @@ github.com/go-chassis/cari v0.0.0-20201210041921-7b6fbef2df11/go.mod h1:MgtsEI0A
 github.com/go-chassis/cari v0.4.0/go.mod h1:av/19fqwEP4eOC8unL/z67AAbFDwXUCko6SKa4Avrd8=
 github.com/go-chassis/cari v0.5.0/go.mod h1:av/19fqwEP4eOC8unL/z67AAbFDwXUCko6SKa4Avrd8=
 github.com/go-chassis/cari v0.5.1-0.20210823023004-74041d1363c4/go.mod h1:av/19fqwEP4eOC8unL/z67AAbFDwXUCko6SKa4Avrd8=
-github.com/go-chassis/cari v0.5.1-0.20211227133501-53aa20cf7a44/go.mod h1:HG0Olv4sy/4e/3e9S0pofO0pzchaDjJ0hMweyFU7d5Q=
-github.com/go-chassis/cari v0.5.1-0.20211229072151-7fa40d0919c6 h1:7Ino94E57cnvKmyKVR0bDhZl/jcI+U2VMsHUD6qAvwg=
-github.com/go-chassis/cari v0.5.1-0.20211229072151-7fa40d0919c6/go.mod h1:HG0Olv4sy/4e/3e9S0pofO0pzchaDjJ0hMweyFU7d5Q=
+github.com/go-chassis/cari v0.5.1-0.20220119150556-8ae374a2649d h1:RtBn1T7KmJM1j1+NlBFqaKJWPWPDde9adDQMFHCKMbU=
+github.com/go-chassis/cari v0.5.1-0.20220119150556-8ae374a2649d/go.mod h1:tKTzguHTGohMCgkcWNZWtA4TwfcsJrIXpfYxsQtb7uw=
 github.com/go-chassis/foundation v0.2.2-0.20201210043510-9f6d3de40234/go.mod h1:2PjwqpVwYEVaAldl5A58a08viH8p27pNeYaiE3ZxOBA=
 github.com/go-chassis/foundation v0.2.2/go.mod h1:2PjwqpVwYEVaAldl5A58a08viH8p27pNeYaiE3ZxOBA=
 github.com/go-chassis/foundation v0.3.0/go.mod h1:2PjwqpVwYEVaAldl5A58a08viH8p27pNeYaiE3ZxOBA=
-github.com/go-chassis/foundation v0.3.1-0.20210806081520-3bd92d1ef787/go.mod h1:6NsIUaHghTFRGfCBcZN011zl196F6OR5QvD9N+P4oWU=
 github.com/go-chassis/foundation v0.4.0 h1:z0xETnSxF+vRXWjoIhOdzt6rywjZ4sB++utEl4YgWEY=
 github.com/go-chassis/foundation v0.4.0/go.mod h1:6NsIUaHghTFRGfCBcZN011zl196F6OR5QvD9N+P4oWU=
 github.com/go-chassis/go-archaius v1.5.1/go.mod h1:QPwvvtBxvwiC48rmydoAqxopqOr93RCQ6syWsIkXPXQ=
@@ -396,9 +394,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
 github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
 github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w=
 github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
-github.com/little-cui/etcdadpt v0.2.1/go.mod h1:727wftF2FS4vfkgFLmIvQue1XH+9u4lK2/hd6L7OAC8=
-github.com/little-cui/etcdadpt v0.3.1 h1:lAPIffcOR6jROu/mWf+zHscV8urIu1qbsJvwvziLWDY=
-github.com/little-cui/etcdadpt v0.3.1/go.mod h1:HnRRpIrVEVNWobkiCvG2EHLWKKZ+L047EcI29ma2zA4=
+github.com/little-cui/etcdadpt v0.3.2 h1:EBXPBxddZXTgWvGsIdAqqG6JCu1TouPNUhVVj9swt/s=
+github.com/little-cui/etcdadpt v0.3.2/go.mod h1:HnRRpIrVEVNWobkiCvG2EHLWKKZ+L047EcI29ma2zA4=
 github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
 github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
 github.com/markbates/oncer v0.0.0-20181203154359-bf2de49a0be2/go.mod h1:Ld9puTsIW75CHf65OeIOkyKbteujpZVXDpWK6YGZbxE=
diff --git a/server/datasource/etcd/init.go b/server/datasource/etcd/init.go
index 622a314..8f0524a 100644
--- a/server/datasource/etcd/init.go
+++ b/server/datasource/etcd/init.go
@@ -21,9 +21,9 @@ import (
 	"crypto/tls"
 	"fmt"
 
-	// support embedded etcd
-	_ "github.com/little-cui/etcdadpt/embedded"
-	_ "github.com/little-cui/etcdadpt/remote"
+	"github.com/go-chassis/cari/db"
+	dconfig "github.com/go-chassis/cari/db/config"
+	"github.com/go-chassis/openlog"
 
 	"github.com/apache/servicecomb-kie/server/config"
 	"github.com/apache/servicecomb-kie/server/datasource"
@@ -32,8 +32,6 @@ import (
 	"github.com/apache/servicecomb-kie/server/datasource/etcd/kv"
 	"github.com/apache/servicecomb-kie/server/datasource/etcd/track"
 	"github.com/apache/servicecomb-kie/server/datasource/tlsutil"
-	"github.com/go-chassis/openlog"
-	"github.com/little-cui/etcdadpt"
 )
 
 type Broker struct {
@@ -50,11 +48,13 @@ func NewFrom(c *datasource.Config) (datasource.Broker, error) {
 			return nil, err
 		}
 	}
-	return &Broker{}, etcdadpt.Init(etcdadpt.Config{
-		Kind:             kind,
-		ClusterAddresses: c.URI,
-		SslEnabled:       c.SSLEnabled,
-		TLSConfig:        tlsConfig,
+	return &Broker{}, db.Init(&dconfig.Config{
+		Kind:       kind,
+		URI:        c.URI,
+		PoolSize:   c.PoolSize,
+		SSLEnabled: c.SSLEnabled,
+		TLSConfig:  tlsConfig,
+		Timeout:    c.Timeout,
 	})
 }
 func (*Broker) GetRevisionDao() datasource.RevisionDao {
diff --git a/server/datasource/mongo/counter/revision.go b/server/datasource/mongo/counter/revision.go
index 1622b61..1fb42bb 100644
--- a/server/datasource/mongo/counter/revision.go
+++ b/server/datasource/mongo/counter/revision.go
@@ -21,10 +21,12 @@ import (
 	"context"
 	"errors"
 
-	"github.com/apache/servicecomb-kie/server/datasource/mongo/session"
+	"github.com/go-chassis/cari/db/mongo"
 	"github.com/go-chassis/openlog"
 	"go.mongodb.org/mongo-driver/bson"
 	"go.mongodb.org/mongo-driver/mongo/options"
+
+	"github.com/apache/servicecomb-kie/server/datasource/mongo/model"
 )
 
 const revision = "revision_counter"
@@ -35,7 +37,7 @@ type Dao struct {
 
 //GetRevision return current revision number
 func (s *Dao) GetRevision(ctx context.Context, domain string) (int64, error) {
-	collection := session.GetDB().Collection(session.CollectionCounter)
+	collection := mongo.GetClient().GetDB().Collection(model.CollectionCounter)
 	filter := bson.M{"name": revision, "domain": domain}
 	cur, err := collection.Find(ctx, filter)
 	if err != nil {
@@ -59,7 +61,7 @@ func (s *Dao) GetRevision(ctx context.Context, domain string) (int64, error) {
 
 //ApplyRevision increase revision number and return modified value
 func (s *Dao) ApplyRevision(ctx context.Context, domain string) (int64, error) {
-	collection := session.GetDB().Collection(session.CollectionCounter)
+	collection := mongo.GetClient().GetDB().Collection(model.CollectionCounter)
 	filter := bson.M{"name": revision, "domain": domain}
 	sr := collection.FindOneAndUpdate(ctx, filter,
 		bson.D{
diff --git a/server/datasource/mongo/history/history_dao.go b/server/datasource/mongo/history/history_dao.go
index 4a08289..b8906f4 100644
--- a/server/datasource/mongo/history/history_dao.go
+++ b/server/datasource/mongo/history/history_dao.go
@@ -22,6 +22,7 @@ import (
 	"fmt"
 	"time"
 
+	"github.com/go-chassis/cari/db/mongo"
 	"github.com/go-chassis/openlog"
 	"go.mongodb.org/mongo-driver/bson"
 	"go.mongodb.org/mongo-driver/bson/primitive"
@@ -29,7 +30,7 @@ import (
 
 	"github.com/apache/servicecomb-kie/pkg/model"
 	"github.com/apache/servicecomb-kie/server/datasource"
-	"github.com/apache/servicecomb-kie/server/datasource/mongo/session"
+	mmodel "github.com/apache/servicecomb-kie/server/datasource/mongo/model"
 )
 
 //Dao is the implementation
@@ -53,7 +54,7 @@ func (s *Dao) GetHistory(ctx context.Context, kvID, project, domain string, opti
 }
 
 func getHistoryByKeyID(ctx context.Context, filter bson.M, offset, limit int64) (*model.KVResponse, error) {
-	collection := session.GetDB().Collection(session.CollectionKVRevision)
+	collection := mongo.GetClient().GetDB().Collection(mmodel.CollectionKVRevision)
 	opt := options.Find().SetSort(map[string]interface{}{
 		"update_revision": -1,
 	})
@@ -95,7 +96,7 @@ func getHistoryByKeyID(ctx context.Context, filter bson.M, offset, limit int64)
 
 //AddHistory add kv history
 func (s *Dao) AddHistory(ctx context.Context, kv *model.KVDoc) error {
-	collection := session.GetDB().Collection(session.CollectionKVRevision)
+	collection := mongo.GetClient().GetDB().Collection(mmodel.CollectionKVRevision)
 	_, err := collection.InsertOne(ctx, kv)
 	if err != nil {
 		openlog.Error(err.Error())
@@ -112,7 +113,7 @@ func (s *Dao) AddHistory(ctx context.Context, kv *model.KVDoc) error {
 //DelayDeletionTime add delete time to all revisions of the kv,
 //thus these revisions will be automatically deleted by TTL index.
 func (s *Dao) DelayDeletionTime(ctx context.Context, kvIDs []string, project, domain string) error {
-	collection := session.GetDB().Collection(session.CollectionKVRevision)
+	collection := mongo.GetClient().GetDB().Collection(mmodel.CollectionKVRevision)
 	now := time.Now()
 	filter := bson.D{
 		{Key: "id", Value: bson.M{"$in": kvIDs}},
@@ -134,7 +135,7 @@ func (s *Dao) DelayDeletionTime(ctx context.Context, kvIDs []string, project, do
 //historyRotate delete historical versions for a key that exceeds the limited number
 func historyRotate(ctx context.Context, kvID, project, domain string) error {
 	filter := bson.M{"id": kvID, "domain": domain, "project": project}
-	collection := session.GetDB().Collection(session.CollectionKVRevision)
+	collection := mongo.GetClient().GetDB().Collection(mmodel.CollectionKVRevision)
 	curTotal, err := collection.CountDocuments(ctx, filter)
 	if err != nil {
 		return err
diff --git a/server/datasource/mongo/init.go b/server/datasource/mongo/init.go
index a0ea991..a104450 100644
--- a/server/datasource/mongo/init.go
+++ b/server/datasource/mongo/init.go
@@ -18,22 +18,59 @@
 package mongo
 
 import (
+	"context"
+	"crypto/tls"
+	"fmt"
+
+	"github.com/go-chassis/cari/db"
+	dconfig "github.com/go-chassis/cari/db/config"
+	dmongo "github.com/go-chassis/cari/db/mongo"
 	"github.com/go-chassis/openlog"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+	"go.mongodb.org/mongo-driver/x/bsonx"
 
+	"github.com/apache/servicecomb-kie/server/config"
 	"github.com/apache/servicecomb-kie/server/datasource"
 	"github.com/apache/servicecomb-kie/server/datasource/mongo/counter"
 	"github.com/apache/servicecomb-kie/server/datasource/mongo/history"
 	"github.com/apache/servicecomb-kie/server/datasource/mongo/kv"
-	"github.com/apache/servicecomb-kie/server/datasource/mongo/session"
+	"github.com/apache/servicecomb-kie/server/datasource/mongo/model"
 	"github.com/apache/servicecomb-kie/server/datasource/mongo/track"
+	"github.com/apache/servicecomb-kie/server/datasource/tlsutil"
 )
 
 type Broker struct {
 }
 
 func NewFrom(c *datasource.Config) (datasource.Broker, error) {
-	openlog.Info("use mongodb as storage")
-	return &Broker{}, session.Init(c)
+	kind := config.GetDB().Kind
+	openlog.Info(fmt.Sprintf("use %s as storage", kind))
+	var tlsConfig *tls.Config
+	if c.SSLEnabled {
+		var err error
+		tlsConfig, err = tlsutil.Config(c)
+		if err != nil {
+			return nil, err
+		}
+	}
+	broker := Broker{}
+	err := db.Init(&dconfig.Config{
+		Kind:       kind,
+		URI:        c.URI,
+		PoolSize:   c.PoolSize,
+		SSLEnabled: c.SSLEnabled,
+		TLSConfig:  tlsConfig,
+		Timeout:    c.Timeout,
+	})
+	if err != nil {
+		return nil, err
+	}
+	if err = ensureDB(); err != nil {
+		return nil, err
+	}
+	return &broker, err
 }
 func (*Broker) GetRevisionDao() datasource.RevisionDao {
 	return &counter.Dao{}
@@ -48,6 +85,95 @@ func (*Broker) GetTrackDao() datasource.TrackDao {
 	return &track.Dao{}
 }
 
+func ensureDB() error {
+	err := ensureRevisionCounter()
+	ensureKV()
+	ensureKVRevision()
+	ensureView()
+	ensureKVLongPolling()
+	return err
+}
+
+func ensureRevisionCounter() error {
+	jsonSchema := bson.M{
+		"bsonType": "object",
+		"required": []string{"name", "domain", "count"},
+	}
+	validator := bson.M{
+		"$jsonSchema": jsonSchema,
+	}
+	revisionCounterIndex := buildIndexDoc("name", "domain")
+	revisionCounterIndex.Options = options.Index().SetUnique(true)
+	dmongo.EnsureCollection(model.CollectionCounter, validator, []mongo.IndexModel{revisionCounterIndex})
+	_, err := dmongo.GetClient().GetDB().Collection(model.CollectionCounter).UpdateOne(context.Background(),
+		bson.M{"name": "revision_counter", "domain": "default"},
+		bson.D{
+			{Key: "$set", Value: bson.D{
+				{Key: "count", Value: 1},
+			}},
+		}, options.Update().SetUpsert(true))
+	return err
+}
+
+func ensureKV() {
+	jsonSchema := bson.M{
+		"bsonType": "object",
+		"required": []string{"key", "domain", "project", "id"},
+	}
+	validator := bson.M{
+		"$jsonSchema": jsonSchema,
+	}
+	kvIndex := buildIndexDoc("id")
+	kvIndex.Options = options.Index().SetUnique(true)
+	dmongo.EnsureCollection(model.CollectionKV, validator, []mongo.IndexModel{kvIndex})
+}
+
+func ensureKVRevision() {
+	kvRevisionIndex := buildIndexDoc("delete_time")
+	kvRevisionIndex.Options = options.Index().SetExpireAfterSeconds(7 * 24 * 3600)
+	dmongo.EnsureCollection(model.CollectionKVRevision, nil, []mongo.IndexModel{kvRevisionIndex})
+}
+
+func ensureView() {
+	jsonSchema := bson.M{
+		"bsonType": "object",
+		"required": []string{"id", "domain", "project", "display"},
+	}
+	validator := bson.M{
+		"$jsonSchema": jsonSchema,
+	}
+	viewIDIndex := buildIndexDoc("id")
+	viewIDIndex.Options = options.Index().SetUnique(true)
+	viewMultipleIndex := buildIndexDoc("display", "domain", "project")
+	viewMultipleIndex.Options = options.Index().SetUnique(true)
+	dmongo.EnsureCollection(model.CollectionView, validator, []mongo.IndexModel{viewIDIndex, viewMultipleIndex})
+}
+
+func ensureKVLongPolling() {
+	jsonSchema := bson.M{
+		"bsonType": "object",
+		"required": []string{"id", "revision", "session_id", "url_path"},
+	}
+	validator := bson.M{
+		"$jsonSchema": jsonSchema,
+	}
+	timestampIndex := buildIndexDoc("timestamp")
+	timestampIndex.Options = options.Index().SetExpireAfterSeconds(7 * 24 * 3600)
+	kvLongPollingIndex := buildIndexDoc("revision", "domain", "session_id")
+	kvLongPollingIndex.Options = options.Index().SetUnique(true)
+	dmongo.EnsureCollection(model.CollectionPollingDetail, validator, []mongo.IndexModel{timestampIndex, kvLongPollingIndex})
+}
+
+func buildIndexDoc(keys ...string) mongo.IndexModel {
+	keysDoc := bsonx.Doc{}
+	for _, key := range keys {
+		keysDoc = keysDoc.Append(key, bsonx.Int32(1))
+	}
+	index := mongo.IndexModel{
+		Keys: keysDoc,
+	}
+	return index
+}
 func init() {
 	datasource.RegisterPlugin("mongo", NewFrom)
 }
diff --git a/server/datasource/mongo/kv/kv_dao.go b/server/datasource/mongo/kv/kv_dao.go
index d44dd5f..1338fae 100644
--- a/server/datasource/mongo/kv/kv_dao.go
+++ b/server/datasource/mongo/kv/kv_dao.go
@@ -23,6 +23,7 @@ import (
 	"regexp"
 	"strings"
 
+	dmongo "github.com/go-chassis/cari/db/mongo"
 	"github.com/go-chassis/cari/sync"
 	"github.com/go-chassis/openlog"
 	"go.mongodb.org/mongo-driver/bson"
@@ -32,7 +33,7 @@ import (
 	"github.com/apache/servicecomb-kie/pkg/model"
 	"github.com/apache/servicecomb-kie/pkg/util"
 	"github.com/apache/servicecomb-kie/server/datasource"
-	"github.com/apache/servicecomb-kie/server/datasource/mongo/session"
+	mmodel "github.com/apache/servicecomb-kie/server/datasource/mongo/model"
 )
 
 const (
@@ -54,7 +55,7 @@ func (s *Dao) Create(ctx context.Context, kv *model.KVDoc, options ...datasource
 }
 
 func create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, error) {
-	collection := session.GetDB().Collection(session.CollectionKV)
+	collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
 	_, err := collection.InsertOne(ctx, kv)
 	if err != nil {
 		openlog.Error("create error", openlog.WithTags(openlog.Tags{
@@ -68,7 +69,7 @@ func create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, error) {
 
 // txnCreate is to start transaction when creating KV, will create task in a transaction operation
 func txnCreate(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, error) {
-	taskSession, err := session.GetDB().Client().StartSession()
+	taskSession, err := dmongo.GetClient().GetDB().Client().StartSession()
 	if err != nil {
 		return nil, err
 	}
@@ -77,7 +78,7 @@ func txnCreate(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, error) {
 	}
 	defer taskSession.EndSession(ctx)
 	if err = mongo.WithSession(ctx, taskSession, func(sessionContext mongo.SessionContext) error {
-		collection := session.GetDB().Collection(session.CollectionKV)
+		collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
 		_, err = collection.InsertOne(sessionContext, kv)
 		if err != nil {
 			openlog.Error("create error", openlog.WithTags(openlog.Tags{
@@ -105,7 +106,7 @@ func txnCreate(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, error) {
 			}
 			return err
 		}
-		collection = session.GetDB().Collection(session.CollectionTask)
+		collection = dmongo.GetClient().GetDB().Collection(mmodel.CollectionTask)
 		_, err = collection.InsertOne(sessionContext, task)
 		if err != nil {
 			openlog.Error("create task error", openlog.WithTags(openlog.Tags{
@@ -143,7 +144,7 @@ func (s *Dao) Update(ctx context.Context, kv *model.KVDoc, options ...datasource
 }
 
 func update(ctx context.Context, kv *model.KVDoc) error {
-	collection := session.GetDB().Collection(session.CollectionKV)
+	collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
 	_, err := collection.UpdateOne(ctx, bson.M{"key": kv.Key, "label_format": kv.LabelFormat}, bson.D{
 		{Key: "$set", Value: bson.D{
 			{Key: "value", Value: kv.Value},
@@ -161,7 +162,7 @@ func update(ctx context.Context, kv *model.KVDoc) error {
 
 // txnUpdate is to start transaction when updating kV, will create task in a transaction operation
 func txnUpdate(ctx context.Context, kv *model.KVDoc) error {
-	taskSession, err := session.GetDB().Client().StartSession()
+	taskSession, err := dmongo.GetClient().GetDB().Client().StartSession()
 	if err != nil {
 		return err
 	}
@@ -170,7 +171,7 @@ func txnUpdate(ctx context.Context, kv *model.KVDoc) error {
 	}
 	defer taskSession.EndSession(ctx)
 	if err = mongo.WithSession(ctx, taskSession, func(sessionContext mongo.SessionContext) error {
-		collection := session.GetDB().Collection(session.CollectionKV)
+		collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
 		result := collection.FindOneAndUpdate(sessionContext, bson.M{"key": kv.Key, "label_format": kv.LabelFormat}, bson.D{
 			{Key: "$set", Value: bson.D{
 				{Key: "value", Value: kv.Value},
@@ -218,7 +219,7 @@ func txnUpdate(ctx context.Context, kv *model.KVDoc) error {
 			}
 			return err
 		}
-		collection = session.GetDB().Collection(session.CollectionTask)
+		collection = dmongo.GetClient().GetDB().Collection(mmodel.CollectionTask)
 		_, err = collection.InsertOne(sessionContext, task)
 		if err != nil {
 			openlog.Error("create task error", openlog.WithTags(openlog.Tags{
@@ -253,7 +254,7 @@ func getValue(str string) string {
 }
 
 func findKV(ctx context.Context, domain string, project string, opts datasource.FindOptions) (*mongo.Cursor, int, error) {
-	collection := session.GetDB().Collection(session.CollectionKV)
+	collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
 	ctx, cancel := context.WithTimeout(ctx, opts.Timeout)
 	defer cancel()
 
@@ -308,7 +309,7 @@ func findKV(ctx context.Context, domain string, project string, opts datasource.
 	return cur, int(curTotal), err
 }
 func findOneKey(ctx context.Context, filter bson.M) ([]*model.KVDoc, error) {
-	collection := session.GetDB().Collection(session.CollectionKV)
+	collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
 	sr := collection.FindOne(ctx, filter)
 	if sr.Err() != nil {
 		if sr.Err() == mongo.ErrNoDocuments {
@@ -371,7 +372,7 @@ func (s *Dao) FindOneAndDelete(ctx context.Context, kvID, project, domain string
 
 func findOneAndDelete(ctx context.Context, kvID, project, domain string) (*model.KVDoc, error) {
 	curKV := &model.KVDoc{}
-	collection := session.GetDB().Collection(session.CollectionKV)
+	collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
 	sr := collection.FindOneAndDelete(ctx, bson.M{"id": kvID, "project": project, "domain": domain})
 	if sr.Err() != nil {
 		if sr.Err() == mongo.ErrNoDocuments {
@@ -390,7 +391,7 @@ func findOneAndDelete(ctx context.Context, kvID, project, domain string) (*model
 // txnFindOneAndDelete is to start transaction when delete KV, will create task and tombstone in a transaction operation
 func txnFindOneAndDelete(ctx context.Context, kvID, project, domain string) (*model.KVDoc, error) {
 	curKV := &model.KVDoc{}
-	taskSession, err := session.GetDB().Client().StartSession()
+	taskSession, err := dmongo.GetClient().GetDB().Client().StartSession()
 	if err != nil {
 		openlog.Error("fail to start session" + err.Error())
 		return nil, err
@@ -401,7 +402,7 @@ func txnFindOneAndDelete(ctx context.Context, kvID, project, domain string) (*mo
 	}
 	defer taskSession.EndSession(ctx)
 	if err = mongo.WithSession(ctx, taskSession, func(sessionContext mongo.SessionContext) error {
-		collection := session.GetDB().Collection(session.CollectionKV)
+		collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
 		sr := collection.FindOneAndDelete(sessionContext, bson.M{"id": kvID, "project": project, "domain": domain})
 		if sr.Err() != nil {
 			errAbort := taskSession.AbortTransaction(sessionContext)
@@ -441,7 +442,7 @@ func txnFindOneAndDelete(ctx context.Context, kvID, project, domain string) (*mo
 			}
 			return err
 		}
-		collection = session.GetDB().Collection(session.CollectionTask)
+		collection = dmongo.GetClient().GetDB().Collection(mmodel.CollectionTask)
 		_, err = collection.InsertOne(sessionContext, task)
 		if err != nil {
 			openlog.Error("create task error", openlog.WithTags(openlog.Tags{
@@ -458,7 +459,7 @@ func txnFindOneAndDelete(ctx context.Context, kvID, project, domain string) (*mo
 			return err
 		}
 		tombstone := sync.NewTombstone(domain, project, datasource.ConfigResource, datasource.TombstoneID(curKV))
-		collection = session.GetDB().Collection(session.CollectionTombstone)
+		collection = dmongo.GetClient().GetDB().Collection(mmodel.CollectionTombstone)
 		_, err = collection.InsertOne(sessionContext, tombstone)
 		if err != nil {
 			openlog.Error("create tombstone error", openlog.WithTags(openlog.Tags{
@@ -507,7 +508,7 @@ func findManyAndDelete(ctx context.Context, kvIDs []string, project, domain stri
 		}
 		return nil, 0, err
 	}
-	collection := session.GetDB().Collection(session.CollectionKV)
+	collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
 	dr, err := collection.DeleteMany(ctx, filter)
 	if err != nil {
 		openlog.Error(fmt.Sprintf("delete kvs [%v] failed : [%v]", kvIDs, err))
@@ -530,7 +531,7 @@ func txnFindManyAndDelete(ctx context.Context, kvIDs []string, project, domain s
 		return nil, 0, err
 	}
 	var deletedCount int64
-	taskSession, err := session.GetDB().Client().StartSession()
+	taskSession, err := dmongo.GetClient().GetDB().Client().StartSession()
 	if err != nil {
 		openlog.Error("fail to start session" + err.Error())
 		return nil, 0, err
@@ -542,7 +543,7 @@ func txnFindManyAndDelete(ctx context.Context, kvIDs []string, project, domain s
 	defer taskSession.EndSession(ctx)
 
 	if err = mongo.WithSession(ctx, taskSession, func(sessionContext mongo.SessionContext) error {
-		collection := session.GetDB().Collection(session.CollectionKV)
+		collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
 		filter := bson.D{
 			{Key: "id", Value: bson.M{"$in": kvIDs}},
 			{Key: "project", Value: project},
@@ -569,7 +570,7 @@ func txnFindManyAndDelete(ctx context.Context, kvIDs []string, project, domain s
 			tasksDoc[i] = task
 			tombstonesDoc[i] = tombstone
 		}
-		collection = session.GetDB().Collection(session.CollectionTask)
+		collection = dmongo.GetClient().GetDB().Collection(mmodel.CollectionTask)
 		_, err = collection.InsertMany(sessionContext, tasksDoc)
 		if err != nil {
 			openlog.Error("create tasks error", openlog.WithTags(openlog.Tags{
@@ -583,7 +584,7 @@ func txnFindManyAndDelete(ctx context.Context, kvIDs []string, project, domain s
 			}
 			return err
 		}
-		collection = session.GetDB().Collection(session.CollectionTombstone)
+		collection = dmongo.GetClient().GetDB().Collection(mmodel.CollectionTombstone)
 		_, err = collection.InsertMany(sessionContext, tombstonesDoc)
 		if err != nil {
 			openlog.Error("create tombstone error", openlog.WithTags(openlog.Tags{
@@ -609,7 +610,7 @@ func txnFindManyAndDelete(ctx context.Context, kvIDs []string, project, domain s
 }
 
 func findKeys(ctx context.Context, filter interface{}, withoutLabel bool) ([]*model.KVDoc, error) {
-	collection := session.GetDB().Collection(session.CollectionKV)
+	collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
 	cur, err := collection.Find(ctx, filter)
 	if err != nil {
 		if err.Error() == context.DeadlineExceeded.Error() {
@@ -666,7 +667,7 @@ func (s *Dao) Get(ctx context.Context, req *model.GetKVRequest) (*model.KVDoc, e
 }
 
 func (s *Dao) Total(ctx context.Context, project, domain string) (int64, error) {
-	collection := session.GetDB().Collection(session.CollectionKV)
+	collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
 	filter := bson.M{"domain": domain, "project": project}
 	total, err := collection.CountDocuments(ctx, filter)
 	if err != nil {
diff --git a/server/datasource/mongo/model/model.go b/server/datasource/mongo/model/model.go
new file mode 100644
index 0000000..4110e7b
--- /dev/null
+++ b/server/datasource/mongo/model/model.go
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package model
+
+const (
+	CollectionKV            = "kv"
+	CollectionKVRevision    = "kv_revision"
+	CollectionPollingDetail = "polling_detail"
+	CollectionCounter       = "counter"
+	CollectionView          = "view"
+	CollectionTask          = "task"
+	CollectionTombstone     = "tombstone"
+)
diff --git a/server/datasource/mongo/session/session.go b/server/datasource/mongo/session/session.go
deleted file mode 100644
index e501b92..0000000
--- a/server/datasource/mongo/session/session.go
+++ /dev/null
@@ -1,312 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-//Package session manage db connection
-package session
-
-import (
-	"context"
-	"crypto/tls"
-	"errors"
-	"reflect"
-	"strings"
-	"sync"
-	"time"
-
-	"github.com/go-chassis/openlog"
-	"go.mongodb.org/mongo-driver/bson"
-	"go.mongodb.org/mongo-driver/bson/bsoncodec"
-	"go.mongodb.org/mongo-driver/mongo"
-	"go.mongodb.org/mongo-driver/mongo/options"
-	"gopkg.in/mgo.v2"
-
-	"github.com/apache/servicecomb-kie/pkg/cipherutil"
-	"github.com/apache/servicecomb-kie/pkg/model"
-	"github.com/apache/servicecomb-kie/server/datasource"
-	"github.com/apache/servicecomb-kie/server/datasource/tlsutil"
-)
-
-//const for db name and collection name
-const (
-	DBName = "servicecomb"
-
-	CollectionLabel         = "label"
-	CollectionKV            = "kv"
-	CollectionKVRevision    = "kv_revision"
-	CollectionPollingDetail = "polling_detail"
-	CollectionCounter       = "counter"
-	CollectionView          = "view"
-	CollectionTask          = "task"
-	CollectionTombstone     = "tombstone"
-)
-
-//db errors
-var (
-	ErrMissingDomain  = errors.New("domain info missing, illegal access")
-	ErrMissingProject = errors.New("project info missing, illegal access")
-	ErrLabelNotExists = errors.New("labels does not exits")
-
-	ErrKeyMustNotEmpty = errors.New("must supply key if you want to get exact one result")
-
-	ErrIDIsNil  = errors.New("id is empty")
-	ErrKeyIsNil = errors.New("key must not be empty")
-
-	ErrViewCreation = errors.New("can not create view")
-	ErrViewUpdate   = errors.New("can not update view")
-	ErrViewDelete   = errors.New("can not delete view")
-	ErrViewNotExist = errors.New("view not exists")
-	ErrViewFinding  = errors.New("view search error")
-	ErrGetPipeline  = errors.New("can not get criteria")
-)
-
-const (
-	MsgDBExists  = "already exists"
-	MsgDuplicate = "duplicate key error collection"
-)
-
-var client *mongo.Client
-var once sync.Once
-var db *mongo.Database
-
-//Init prepare params
-func Init(c *datasource.Config) error {
-	var err error
-	once.Do(func() {
-		sc, _ := bsoncodec.NewStructCodec(bsoncodec.DefaultStructTagParser)
-		reg := bson.NewRegistryBuilder().
-			RegisterTypeEncoder(reflect.TypeOf(model.LabelDoc{}), sc).
-			RegisterTypeEncoder(reflect.TypeOf(model.KVDoc{}), sc).
-			Build()
-		uri := cipherutil.TryDecrypt(c.URI)
-		clientOps := []*options.ClientOptions{options.Client().ApplyURI(uri)}
-		if c.SSLEnabled {
-			var tc *tls.Config
-			tc, err = tlsutil.Config(c)
-			if err != nil {
-				return
-			}
-			clientOps = append(clientOps, options.Client().SetTLSConfig(tc))
-			openlog.Info("enabled ssl communication to mongodb")
-		}
-		client, err = mongo.NewClient(clientOps...)
-		if err != nil {
-			return
-		}
-		openlog.Info("DB connecting")
-
-		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
-		defer cancel()
-
-		err = client.Connect(ctx)
-		if err != nil {
-			return
-		}
-		openlog.Info("DB connected")
-		db = client.Database(DBName, &options.DatabaseOptions{
-			Registry: reg,
-		})
-
-	})
-	EnsureDB(c)
-	return nil
-}
-
-//GetDB get mongo db client
-func GetDB() *mongo.Database {
-	return db
-}
-
-//CreateView run mongo db command to create view
-func CreateView(ctx context.Context, view, source string, pipeline mongo.Pipeline) error {
-	sr := GetDB().RunCommand(ctx,
-		bson.D{
-			{Key: "create", Value: view},
-			{Key: "viewOn", Value: source},
-			{Key: "pipeline", Value: pipeline},
-		})
-	if sr.Err() != nil {
-		openlog.Error("can not create view: " + sr.Err().Error())
-		return ErrViewCreation
-	}
-	return nil
-}
-
-//DropView deletes view
-func DropView(ctx context.Context, view string) error {
-	err := GetDB().Collection(view).Drop(ctx)
-	if err != nil {
-		openlog.Error(err.Error())
-		return err
-	}
-	return nil
-}
-
-//GetColInfo get collection info
-func GetColInfo(ctx context.Context, name string) (*CollectionInfo, error) {
-	cur, err := GetDB().ListCollections(ctx, bson.M{"name": name, "type": "view"})
-	if err != nil {
-		openlog.Error(err.Error())
-		return nil, ErrGetPipeline
-	}
-	defer cur.Close(ctx)
-	if !cur.Next(ctx) {
-		return nil, ErrGetPipeline
-	}
-	openlog.Debug(cur.Current.String())
-	c := &CollectionInfo{}
-	if err := cur.Decode(c); err != nil {
-		openlog.Error(err.Error())
-		return nil, ErrGetPipeline
-	}
-	return c, nil
-}
-
-//EnsureDB build mongo db schema
-func EnsureDB(c *datasource.Config) {
-	session := OpenSession(c)
-	defer session.Close()
-	session.SetMode(mgo.Primary, true)
-
-	ensureRevisionCounter(session)
-
-	ensureKV(session)
-
-	ensureKVRevision(session)
-
-	ensureView(session)
-
-	ensureKVLongPolling(session)
-
-}
-
-func OpenSession(c *datasource.Config) *mgo.Session {
-	timeout := c.Timeout
-	var uri string
-	var err error
-	uri = cipherutil.TryDecrypt(c.URI)
-	session, err := mgo.DialWithTimeout(uri, timeout)
-	if err != nil {
-		openlog.Warn("can not dial db, retry once:" + err.Error())
-		session, err = mgo.DialWithTimeout(uri, timeout)
-		if err != nil {
-			openlog.Fatal("can not dial db:" + err.Error())
-		}
-	}
-	return session
-}
-
-func ensureKVLongPolling(session *mgo.Session) {
-	c := session.DB(DBName).C(CollectionPollingDetail)
-	err := c.Create(&mgo.CollectionInfo{Validator: bson.M{
-		"id":         bson.M{"$exists": true},
-		"revision":   bson.M{"$exists": true},
-		"session_id": bson.M{"$exists": true},
-		"url_path":   bson.M{"$exists": true},
-	}})
-	wrapError(err, MsgDBExists)
-	err = c.EnsureIndex(mgo.Index{
-		Key:         []string{"timestamp"},
-		ExpireAfter: 7 * 24 * time.Hour,
-	})
-	wrapError(err)
-	err = c.EnsureIndex(mgo.Index{
-		Key:    []string{"revision", "domain", "session_id"},
-		Unique: true,
-	})
-	wrapError(err)
-}
-
-func ensureView(session *mgo.Session) {
-	c := session.DB(DBName).C(CollectionView)
-	err := c.Create(&mgo.CollectionInfo{Validator: bson.M{
-		"id":      bson.M{"$exists": true},
-		"domain":  bson.M{"$exists": true},
-		"project": bson.M{"$exists": true},
-		"display": bson.M{"$exists": true},
-	}})
-	wrapError(err, MsgDBExists)
-	err = c.EnsureIndex(mgo.Index{
-		Key:    []string{"id"},
-		Unique: true,
-	})
-	wrapError(err)
-	err = c.EnsureIndex(mgo.Index{
-		Key:    []string{"display", "domain", "project"},
-		Unique: true,
-	})
-	wrapError(err)
-}
-
-func ensureKVRevision(session *mgo.Session) {
-	c := session.DB(DBName).C(CollectionKVRevision)
-	err := c.EnsureIndex(mgo.Index{
-		Key:         []string{"delete_time"},
-		ExpireAfter: 7 * 24 * time.Hour,
-	})
-	wrapError(err, MsgDBExists)
-}
-
-func ensureKV(session *mgo.Session) {
-	c := session.DB(DBName).C(CollectionKV)
-	err := c.Create(&mgo.CollectionInfo{Validator: bson.M{
-		"key":     bson.M{"$exists": true},
-		"domain":  bson.M{"$exists": true},
-		"project": bson.M{"$exists": true},
-		"id":      bson.M{"$exists": true},
-	}})
-	wrapError(err, MsgDBExists)
-	err = c.EnsureIndex(mgo.Index{
-		Key:    []string{"id"},
-		Unique: true,
-	})
-	wrapError(err)
-	err = c.EnsureIndex(mgo.Index{
-		Key:    []string{"key", "label_format", "domain", "project"},
-		Unique: true,
-	})
-	wrapError(err)
-}
-
-func ensureRevisionCounter(session *mgo.Session) {
-	c := session.DB(DBName).C(CollectionCounter)
-	err := c.Create(&mgo.CollectionInfo{Validator: bson.M{
-		"name":   bson.M{"$exists": true},
-		"domain": bson.M{"$exists": true},
-		"count":  bson.M{"$exists": true},
-	}})
-	wrapError(err, MsgDBExists)
-	err = c.EnsureIndex(mgo.Index{
-		Key:    []string{"name", "domain"},
-		Unique: true,
-	})
-	wrapError(err)
-	docs := map[string]interface{}{"name": "revision_counter", "count": 1, "domain": "default"}
-	err = c.Insert(docs)
-	wrapError(err, MsgDuplicate)
-}
-
-func wrapError(err error, skipMsg ...string) {
-	if err != nil {
-		for _, str := range skipMsg {
-			if strings.Contains(err.Error(), str) {
-				openlog.Debug(err.Error())
-				return
-			}
-		}
-		openlog.Fatal(err.Error())
-	}
-}
diff --git a/server/datasource/mongo/session/session_test.go b/server/datasource/mongo/session/session_test.go
deleted file mode 100644
index c5eaee1..0000000
--- a/server/datasource/mongo/session/session_test.go
+++ /dev/null
@@ -1,34 +0,0 @@
-package session_test
-
-import (
-	"context"
-	"testing"
-	"time"
-
-	"github.com/apache/servicecomb-kie/server/datasource"
-	"github.com/apache/servicecomb-kie/server/datasource/mongo/session"
-	_ "github.com/apache/servicecomb-kie/test"
-	"github.com/stretchr/testify/assert"
-	"go.mongodb.org/mongo-driver/bson"
-)
-
-func TestGetColInfo(t *testing.T) {
-	var err error
-	err = session.Init(&datasource.Config{
-		URI:     "mongodb://kie:123@127.0.0.1:27017/servicecomb",
-		Timeout: 10 * time.Second,
-	})
-	assert.NoError(t, err)
-	err = session.CreateView(context.Background(), "test_view", session.CollectionKV, []bson.D{
-		{{
-			"$match",
-			bson.D{{"domain", "default"}, {"project", "default"}},
-		}},
-	})
-	assert.NoError(t, err)
-	c, err := session.GetColInfo(context.Background(), "test_view")
-	assert.NoError(t, err)
-	assert.Equal(t, "default", c.Options.Pipeline[0]["$match"]["domain"])
-	err = session.DropView(context.Background(), "test_view")
-	assert.NoError(t, err)
-}
diff --git a/server/datasource/mongo/session/struct.go b/server/datasource/mongo/session/struct.go
deleted file mode 100644
index 44a5c62..0000000
--- a/server/datasource/mongo/session/struct.go
+++ /dev/null
@@ -1,11 +0,0 @@
-package session
-
-//CollectionInfo is struct
-type CollectionInfo struct {
-	Options Options `json:"options"`
-}
-
-//Options is struct
-type Options struct {
-	Pipeline []map[string]map[string]string `json:"pipeline"`
-}
diff --git a/server/datasource/mongo/track/polling_detail_dao.go b/server/datasource/mongo/track/polling_detail_dao.go
index 5c51e67..27ceae8 100644
--- a/server/datasource/mongo/track/polling_detail_dao.go
+++ b/server/datasource/mongo/track/polling_detail_dao.go
@@ -20,13 +20,15 @@ package track
 import (
 	"context"
 
-	"github.com/apache/servicecomb-kie/pkg/model"
-	"github.com/apache/servicecomb-kie/server/datasource"
-	"github.com/apache/servicecomb-kie/server/datasource/mongo/session"
+	dmongo "github.com/go-chassis/cari/db/mongo"
 	"github.com/go-chassis/openlog"
 	"github.com/gofrs/uuid"
 	"go.mongodb.org/mongo-driver/bson"
 	"go.mongodb.org/mongo-driver/mongo"
+
+	"github.com/apache/servicecomb-kie/pkg/model"
+	"github.com/apache/servicecomb-kie/server/datasource"
+	mmodel "github.com/apache/servicecomb-kie/server/datasource/mongo/model"
 )
 
 //Dao is the implementation
@@ -36,7 +38,7 @@ type Dao struct {
 //CreateOrUpdate create a record or update exist record
 //If revision and session_id exists then update else insert
 func (s *Dao) CreateOrUpdate(ctx context.Context, detail *model.PollingDetail) (*model.PollingDetail, error) {
-	collection := session.GetDB().Collection(session.CollectionPollingDetail)
+	collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionPollingDetail)
 	queryFilter := bson.M{"revision": detail.Revision, "domain": detail.Domain, "session_id": detail.SessionID}
 	res := collection.FindOne(ctx, queryFilter)
 	if res.Err() != nil {
@@ -63,7 +65,7 @@ func (s *Dao) CreateOrUpdate(ctx context.Context, detail *model.PollingDetail) (
 
 //Get is to get a track data
 func (s *Dao) GetPollingDetail(ctx context.Context, detail *model.PollingDetail) ([]*model.PollingDetail, error) {
-	collection := session.GetDB().Collection(session.CollectionPollingDetail)
+	collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionPollingDetail)
 	queryFilter := bson.M{"domain": detail.Domain}
 	if detail.SessionID != "" {
 		queryFilter["session_id"] = detail.SessionID
diff --git a/server/resource/v1/common.go b/server/resource/v1/common.go
index 225abbc..4baff07 100644
--- a/server/resource/v1/common.go
+++ b/server/resource/v1/common.go
@@ -35,7 +35,6 @@ import (
 
 	"github.com/apache/servicecomb-kie/pkg/common"
 	"github.com/apache/servicecomb-kie/pkg/model"
-	"github.com/apache/servicecomb-kie/server/datasource/mongo/session"
 	"github.com/apache/servicecomb-kie/server/pubsub"
 	goRestful "github.com/emicklei/go-restful"
 	"github.com/go-chassis/cari/config"
@@ -69,6 +68,10 @@ func NewObserver() (*pubsub.Observer, error) {
 //err
 var (
 	ErrInvalidRev = errors.New(common.MsgInvalidRev)
+
+	ErrMissingDomain  = errors.New("domain info missing, illegal access")
+	ErrMissingProject = errors.New("project info missing, illegal access")
+	ErrIDIsNil        = errors.New("id is empty")
 )
 
 //ReadClaims get auth info
@@ -242,7 +245,7 @@ func checkPagination(offsetStr, limitStr string) (int64, int64, error) {
 
 func validateGet(domain, project, kvID string) error {
 	if kvID == "" {
-		return session.ErrIDIsNil
+		return ErrIDIsNil
 	}
 	return checkDomainAndProject(domain, project)
 }
@@ -257,10 +260,10 @@ func validateDeleteList(domain, project string) error {
 
 func checkDomainAndProject(domain, project string) error {
 	if domain == "" {
-		return session.ErrMissingDomain
+		return ErrMissingDomain
 	}
 	if project == "" {
-		return session.ErrMissingProject
+		return ErrMissingProject
 	}
 	return nil
 }
diff --git a/test/init.go b/test/init.go
index 7732267..489fd19 100644
--- a/test/init.go
+++ b/test/init.go
@@ -18,23 +18,21 @@
 package test
 
 import (
-	"time"
+	"github.com/go-chassis/go-archaius"
+	"github.com/go-chassis/go-chassis/v2/security/cipher"
 
 	"github.com/apache/servicecomb-kie/pkg/validator"
 	"github.com/apache/servicecomb-kie/server/config"
 	"github.com/apache/servicecomb-kie/server/datasource"
 	edatasource "github.com/apache/servicecomb-service-center/eventbase/datasource"
-	"github.com/go-chassis/cari/db"
-	"github.com/go-chassis/go-archaius"
-	"github.com/go-chassis/go-chassis/v2/security/cipher"
+
+	_ "github.com/go-chassis/cari/db/bootstrap"
 
 	_ "github.com/apache/servicecomb-kie/server/datasource/etcd"
 	_ "github.com/apache/servicecomb-kie/server/datasource/mongo"
 	_ "github.com/apache/servicecomb-kie/server/pubsub/notifier"
 	_ "github.com/apache/servicecomb-service-center/eventbase/bootstrap"
 	_ "github.com/go-chassis/go-chassis/v2/security/cipher/plugins/plain"
-	_ "github.com/little-cui/etcdadpt/embedded"
-	_ "github.com/little-cui/etcdadpt/remote"
 )
 
 var (
@@ -75,16 +73,9 @@ func init() {
 	if err != nil {
 		panic(err)
 	}
-	if kind != "embedded_etcd" {
-		eventbaseDBCfg := db.Config{
-			URI:     uri,
-			Kind:    kind,
-			Timeout: 10 * time.Second,
-		}
-		err = edatasource.Init(eventbaseDBCfg)
-		if err != nil {
-			panic(err)
-		}
+	err = edatasource.Init(kind)
+	if err != nil {
+		panic(err)
 	}
 }