You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2022/01/21 06:49:13 UTC
[servicecomb-kie] branch master updated: [fix] update etcdadpt and cari in go.mod (#241)
This is an automated email from the ASF dual-hosted git repository.
littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-kie.git
The following commit(s) were added to refs/heads/master by this push:
new 640c5ac [fix] update etcdadpt and cari in go.mod (#241)
640c5ac is described below
commit 640c5ac2365741ba113d2585ca7b5cb8fd1703e5
Author: robotljw <79...@qq.com>
AuthorDate: Fri Jan 21 14:49:05 2022 +0800
[fix] update etcdadpt and cari in go.mod (#241)
---
cmd/kieserver/main.go | 3 +
go.mod | 7 +-
go.sum | 15 +-
server/datasource/etcd/init.go | 20 +-
server/datasource/mongo/counter/revision.go | 8 +-
server/datasource/mongo/history/history_dao.go | 11 +-
server/datasource/mongo/init.go | 132 ++++++++-
server/datasource/mongo/kv/kv_dao.go | 47 ++--
server/datasource/mongo/model/model.go | 28 ++
server/datasource/mongo/session/session.go | 312 ---------------------
server/datasource/mongo/session/session_test.go | 34 ---
server/datasource/mongo/session/struct.go | 11 -
.../datasource/mongo/track/polling_detail_dao.go | 12 +-
server/resource/v1/common.go | 11 +-
test/init.go | 23 +-
15 files changed, 235 insertions(+), 439 deletions(-)
diff --git a/cmd/kieserver/main.go b/cmd/kieserver/main.go
index bc717a2..2572f08 100644
--- a/cmd/kieserver/main.go
+++ b/cmd/kieserver/main.go
@@ -29,6 +29,9 @@ import (
_ "github.com/go-chassis/go-chassis/v2/middleware/monitoring"
_ "github.com/go-chassis/go-chassis/v2/middleware/ratelimiter"
+ // cari db
+ _ "github.com/go-chassis/cari/db/bootstrap"
+
//storage
_ "github.com/apache/servicecomb-kie/server/datasource/etcd"
_ "github.com/apache/servicecomb-kie/server/datasource/mongo"
diff --git a/go.mod b/go.mod
index 9179e01..de2a2be 100644
--- a/go.mod
+++ b/go.mod
@@ -1,9 +1,9 @@
module github.com/apache/servicecomb-kie
require (
- github.com/apache/servicecomb-service-center/eventbase v0.0.0-20211230015739-512a9cc7b4cd
+ github.com/apache/servicecomb-service-center/eventbase v0.0.0-20220120070230-26997eb876ca
github.com/emicklei/go-restful v2.12.0+incompatible
- github.com/go-chassis/cari v0.5.1-0.20211229072151-7fa40d0919c6
+ github.com/go-chassis/cari v0.5.1-0.20220119150556-8ae374a2649d
github.com/go-chassis/foundation v0.4.0
github.com/go-chassis/go-archaius v1.5.2-0.20210301074935-e4694f6b077b
github.com/go-chassis/go-chassis/v2 v2.3.1-0.20211217084436-360a6a6a0ef3
@@ -11,11 +11,10 @@ require (
github.com/go-chassis/seclog v1.3.1-0.20210917082355-52c40864f240
github.com/gofrs/uuid v4.0.0+incompatible
github.com/hashicorp/serf v0.9.5
- github.com/little-cui/etcdadpt v0.3.1
+ github.com/little-cui/etcdadpt v0.3.2
github.com/stretchr/testify v1.7.0
github.com/urfave/cli v1.22.4
go.mongodb.org/mongo-driver v1.4.6
- gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
gopkg.in/yaml.v2 v2.4.0
)
diff --git a/go.sum b/go.sum
index 7024637..ad81b04 100644
--- a/go.sum
+++ b/go.sum
@@ -40,8 +40,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
-github.com/apache/servicecomb-service-center/eventbase v0.0.0-20211230015739-512a9cc7b4cd h1:U1NiPGnnbZML/kOY52v6BCh1SlXrcsAnzZmj1IWAgTk=
-github.com/apache/servicecomb-service-center/eventbase v0.0.0-20211230015739-512a9cc7b4cd/go.mod h1:B/u1cq2IHLrbUTfC5jTM8rQFhJ7XZhmemqMWR0CGDHo=
+github.com/apache/servicecomb-service-center/eventbase v0.0.0-20220120070230-26997eb876ca h1:e4NoNmQYa7y0K+jJSbvQHsQ+mVI1ThBk/pP21MbFblQ=
+github.com/apache/servicecomb-service-center/eventbase v0.0.0-20220120070230-26997eb876ca/go.mod h1:eClC23tfQ0Q4vdesWnBeFfEyf58XyEQhoWki3+HSQ9s=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e h1:QEF07wC0T1rKkctt1RINW/+RMTVmiwxETico2l3gxJA=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I=
@@ -146,13 +146,11 @@ github.com/go-chassis/cari v0.0.0-20201210041921-7b6fbef2df11/go.mod h1:MgtsEI0A
github.com/go-chassis/cari v0.4.0/go.mod h1:av/19fqwEP4eOC8unL/z67AAbFDwXUCko6SKa4Avrd8=
github.com/go-chassis/cari v0.5.0/go.mod h1:av/19fqwEP4eOC8unL/z67AAbFDwXUCko6SKa4Avrd8=
github.com/go-chassis/cari v0.5.1-0.20210823023004-74041d1363c4/go.mod h1:av/19fqwEP4eOC8unL/z67AAbFDwXUCko6SKa4Avrd8=
-github.com/go-chassis/cari v0.5.1-0.20211227133501-53aa20cf7a44/go.mod h1:HG0Olv4sy/4e/3e9S0pofO0pzchaDjJ0hMweyFU7d5Q=
-github.com/go-chassis/cari v0.5.1-0.20211229072151-7fa40d0919c6 h1:7Ino94E57cnvKmyKVR0bDhZl/jcI+U2VMsHUD6qAvwg=
-github.com/go-chassis/cari v0.5.1-0.20211229072151-7fa40d0919c6/go.mod h1:HG0Olv4sy/4e/3e9S0pofO0pzchaDjJ0hMweyFU7d5Q=
+github.com/go-chassis/cari v0.5.1-0.20220119150556-8ae374a2649d h1:RtBn1T7KmJM1j1+NlBFqaKJWPWPDde9adDQMFHCKMbU=
+github.com/go-chassis/cari v0.5.1-0.20220119150556-8ae374a2649d/go.mod h1:tKTzguHTGohMCgkcWNZWtA4TwfcsJrIXpfYxsQtb7uw=
github.com/go-chassis/foundation v0.2.2-0.20201210043510-9f6d3de40234/go.mod h1:2PjwqpVwYEVaAldl5A58a08viH8p27pNeYaiE3ZxOBA=
github.com/go-chassis/foundation v0.2.2/go.mod h1:2PjwqpVwYEVaAldl5A58a08viH8p27pNeYaiE3ZxOBA=
github.com/go-chassis/foundation v0.3.0/go.mod h1:2PjwqpVwYEVaAldl5A58a08viH8p27pNeYaiE3ZxOBA=
-github.com/go-chassis/foundation v0.3.1-0.20210806081520-3bd92d1ef787/go.mod h1:6NsIUaHghTFRGfCBcZN011zl196F6OR5QvD9N+P4oWU=
github.com/go-chassis/foundation v0.4.0 h1:z0xETnSxF+vRXWjoIhOdzt6rywjZ4sB++utEl4YgWEY=
github.com/go-chassis/foundation v0.4.0/go.mod h1:6NsIUaHghTFRGfCBcZN011zl196F6OR5QvD9N+P4oWU=
github.com/go-chassis/go-archaius v1.5.1/go.mod h1:QPwvvtBxvwiC48rmydoAqxopqOr93RCQ6syWsIkXPXQ=
@@ -396,9 +394,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w=
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
-github.com/little-cui/etcdadpt v0.2.1/go.mod h1:727wftF2FS4vfkgFLmIvQue1XH+9u4lK2/hd6L7OAC8=
-github.com/little-cui/etcdadpt v0.3.1 h1:lAPIffcOR6jROu/mWf+zHscV8urIu1qbsJvwvziLWDY=
-github.com/little-cui/etcdadpt v0.3.1/go.mod h1:HnRRpIrVEVNWobkiCvG2EHLWKKZ+L047EcI29ma2zA4=
+github.com/little-cui/etcdadpt v0.3.2 h1:EBXPBxddZXTgWvGsIdAqqG6JCu1TouPNUhVVj9swt/s=
+github.com/little-cui/etcdadpt v0.3.2/go.mod h1:HnRRpIrVEVNWobkiCvG2EHLWKKZ+L047EcI29ma2zA4=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/markbates/oncer v0.0.0-20181203154359-bf2de49a0be2/go.mod h1:Ld9puTsIW75CHf65OeIOkyKbteujpZVXDpWK6YGZbxE=
diff --git a/server/datasource/etcd/init.go b/server/datasource/etcd/init.go
index 622a314..8f0524a 100644
--- a/server/datasource/etcd/init.go
+++ b/server/datasource/etcd/init.go
@@ -21,9 +21,9 @@ import (
"crypto/tls"
"fmt"
- // support embedded etcd
- _ "github.com/little-cui/etcdadpt/embedded"
- _ "github.com/little-cui/etcdadpt/remote"
+ "github.com/go-chassis/cari/db"
+ dconfig "github.com/go-chassis/cari/db/config"
+ "github.com/go-chassis/openlog"
"github.com/apache/servicecomb-kie/server/config"
"github.com/apache/servicecomb-kie/server/datasource"
@@ -32,8 +32,6 @@ import (
"github.com/apache/servicecomb-kie/server/datasource/etcd/kv"
"github.com/apache/servicecomb-kie/server/datasource/etcd/track"
"github.com/apache/servicecomb-kie/server/datasource/tlsutil"
- "github.com/go-chassis/openlog"
- "github.com/little-cui/etcdadpt"
)
type Broker struct {
@@ -50,11 +48,13 @@ func NewFrom(c *datasource.Config) (datasource.Broker, error) {
return nil, err
}
}
- return &Broker{}, etcdadpt.Init(etcdadpt.Config{
- Kind: kind,
- ClusterAddresses: c.URI,
- SslEnabled: c.SSLEnabled,
- TLSConfig: tlsConfig,
+ return &Broker{}, db.Init(&dconfig.Config{
+ Kind: kind,
+ URI: c.URI,
+ PoolSize: c.PoolSize,
+ SSLEnabled: c.SSLEnabled,
+ TLSConfig: tlsConfig,
+ Timeout: c.Timeout,
})
}
func (*Broker) GetRevisionDao() datasource.RevisionDao {
diff --git a/server/datasource/mongo/counter/revision.go b/server/datasource/mongo/counter/revision.go
index 1622b61..1fb42bb 100644
--- a/server/datasource/mongo/counter/revision.go
+++ b/server/datasource/mongo/counter/revision.go
@@ -21,10 +21,12 @@ import (
"context"
"errors"
- "github.com/apache/servicecomb-kie/server/datasource/mongo/session"
+ "github.com/go-chassis/cari/db/mongo"
"github.com/go-chassis/openlog"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
+
+ "github.com/apache/servicecomb-kie/server/datasource/mongo/model"
)
const revision = "revision_counter"
@@ -35,7 +37,7 @@ type Dao struct {
//GetRevision return current revision number
func (s *Dao) GetRevision(ctx context.Context, domain string) (int64, error) {
- collection := session.GetDB().Collection(session.CollectionCounter)
+ collection := mongo.GetClient().GetDB().Collection(model.CollectionCounter)
filter := bson.M{"name": revision, "domain": domain}
cur, err := collection.Find(ctx, filter)
if err != nil {
@@ -59,7 +61,7 @@ func (s *Dao) GetRevision(ctx context.Context, domain string) (int64, error) {
//ApplyRevision increase revision number and return modified value
func (s *Dao) ApplyRevision(ctx context.Context, domain string) (int64, error) {
- collection := session.GetDB().Collection(session.CollectionCounter)
+ collection := mongo.GetClient().GetDB().Collection(model.CollectionCounter)
filter := bson.M{"name": revision, "domain": domain}
sr := collection.FindOneAndUpdate(ctx, filter,
bson.D{
diff --git a/server/datasource/mongo/history/history_dao.go b/server/datasource/mongo/history/history_dao.go
index 4a08289..b8906f4 100644
--- a/server/datasource/mongo/history/history_dao.go
+++ b/server/datasource/mongo/history/history_dao.go
@@ -22,6 +22,7 @@ import (
"fmt"
"time"
+ "github.com/go-chassis/cari/db/mongo"
"github.com/go-chassis/openlog"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
@@ -29,7 +30,7 @@ import (
"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/server/datasource"
- "github.com/apache/servicecomb-kie/server/datasource/mongo/session"
+ mmodel "github.com/apache/servicecomb-kie/server/datasource/mongo/model"
)
//Dao is the implementation
@@ -53,7 +54,7 @@ func (s *Dao) GetHistory(ctx context.Context, kvID, project, domain string, opti
}
func getHistoryByKeyID(ctx context.Context, filter bson.M, offset, limit int64) (*model.KVResponse, error) {
- collection := session.GetDB().Collection(session.CollectionKVRevision)
+ collection := mongo.GetClient().GetDB().Collection(mmodel.CollectionKVRevision)
opt := options.Find().SetSort(map[string]interface{}{
"update_revision": -1,
})
@@ -95,7 +96,7 @@ func getHistoryByKeyID(ctx context.Context, filter bson.M, offset, limit int64)
//AddHistory add kv history
func (s *Dao) AddHistory(ctx context.Context, kv *model.KVDoc) error {
- collection := session.GetDB().Collection(session.CollectionKVRevision)
+ collection := mongo.GetClient().GetDB().Collection(mmodel.CollectionKVRevision)
_, err := collection.InsertOne(ctx, kv)
if err != nil {
openlog.Error(err.Error())
@@ -112,7 +113,7 @@ func (s *Dao) AddHistory(ctx context.Context, kv *model.KVDoc) error {
//DelayDeletionTime add delete time to all revisions of the kv,
//thus these revisions will be automatically deleted by TTL index.
func (s *Dao) DelayDeletionTime(ctx context.Context, kvIDs []string, project, domain string) error {
- collection := session.GetDB().Collection(session.CollectionKVRevision)
+ collection := mongo.GetClient().GetDB().Collection(mmodel.CollectionKVRevision)
now := time.Now()
filter := bson.D{
{Key: "id", Value: bson.M{"$in": kvIDs}},
@@ -134,7 +135,7 @@ func (s *Dao) DelayDeletionTime(ctx context.Context, kvIDs []string, project, do
//historyRotate delete historical versions for a key that exceeds the limited number
func historyRotate(ctx context.Context, kvID, project, domain string) error {
filter := bson.M{"id": kvID, "domain": domain, "project": project}
- collection := session.GetDB().Collection(session.CollectionKVRevision)
+ collection := mongo.GetClient().GetDB().Collection(mmodel.CollectionKVRevision)
curTotal, err := collection.CountDocuments(ctx, filter)
if err != nil {
return err
diff --git a/server/datasource/mongo/init.go b/server/datasource/mongo/init.go
index a0ea991..a104450 100644
--- a/server/datasource/mongo/init.go
+++ b/server/datasource/mongo/init.go
@@ -18,22 +18,59 @@
package mongo
import (
+ "context"
+ "crypto/tls"
+ "fmt"
+
+ "github.com/go-chassis/cari/db"
+ dconfig "github.com/go-chassis/cari/db/config"
+ dmongo "github.com/go-chassis/cari/db/mongo"
"github.com/go-chassis/openlog"
+ "go.mongodb.org/mongo-driver/bson"
+ "go.mongodb.org/mongo-driver/mongo"
+ "go.mongodb.org/mongo-driver/mongo/options"
+ "go.mongodb.org/mongo-driver/x/bsonx"
+ "github.com/apache/servicecomb-kie/server/config"
"github.com/apache/servicecomb-kie/server/datasource"
"github.com/apache/servicecomb-kie/server/datasource/mongo/counter"
"github.com/apache/servicecomb-kie/server/datasource/mongo/history"
"github.com/apache/servicecomb-kie/server/datasource/mongo/kv"
- "github.com/apache/servicecomb-kie/server/datasource/mongo/session"
+ "github.com/apache/servicecomb-kie/server/datasource/mongo/model"
"github.com/apache/servicecomb-kie/server/datasource/mongo/track"
+ "github.com/apache/servicecomb-kie/server/datasource/tlsutil"
)
type Broker struct {
}
func NewFrom(c *datasource.Config) (datasource.Broker, error) {
- openlog.Info("use mongodb as storage")
- return &Broker{}, session.Init(c)
+ kind := config.GetDB().Kind
+ openlog.Info(fmt.Sprintf("use %s as storage", kind))
+ var tlsConfig *tls.Config
+ if c.SSLEnabled {
+ var err error
+ tlsConfig, err = tlsutil.Config(c)
+ if err != nil {
+ return nil, err
+ }
+ }
+ broker := Broker{}
+ err := db.Init(&dconfig.Config{
+ Kind: kind,
+ URI: c.URI,
+ PoolSize: c.PoolSize,
+ SSLEnabled: c.SSLEnabled,
+ TLSConfig: tlsConfig,
+ Timeout: c.Timeout,
+ })
+ if err != nil {
+ return nil, err
+ }
+ if err = ensureDB(); err != nil {
+ return nil, err
+ }
+ return &broker, err
}
func (*Broker) GetRevisionDao() datasource.RevisionDao {
return &counter.Dao{}
@@ -48,6 +85,95 @@ func (*Broker) GetTrackDao() datasource.TrackDao {
return &track.Dao{}
}
+func ensureDB() error {
+ err := ensureRevisionCounter()
+ ensureKV()
+ ensureKVRevision()
+ ensureView()
+ ensureKVLongPolling()
+ return err
+}
+
+func ensureRevisionCounter() error {
+ jsonSchema := bson.M{
+ "bsonType": "object",
+ "required": []string{"name", "domain", "count"},
+ }
+ validator := bson.M{
+ "$jsonSchema": jsonSchema,
+ }
+ revisionCounterIndex := buildIndexDoc("name", "domain")
+ revisionCounterIndex.Options = options.Index().SetUnique(true)
+ dmongo.EnsureCollection(model.CollectionCounter, validator, []mongo.IndexModel{revisionCounterIndex})
+ _, err := dmongo.GetClient().GetDB().Collection(model.CollectionCounter).UpdateOne(context.Background(),
+ bson.M{"name": "revision_counter", "domain": "default"},
+ bson.D{
+ {Key: "$set", Value: bson.D{
+ {Key: "count", Value: 1},
+ }},
+ }, options.Update().SetUpsert(true))
+ return err
+}
+
+func ensureKV() {
+ jsonSchema := bson.M{
+ "bsonType": "object",
+ "required": []string{"key", "domain", "project", "id"},
+ }
+ validator := bson.M{
+ "$jsonSchema": jsonSchema,
+ }
+ kvIndex := buildIndexDoc("id")
+ kvIndex.Options = options.Index().SetUnique(true)
+ dmongo.EnsureCollection(model.CollectionKV, validator, []mongo.IndexModel{kvIndex})
+}
+
+func ensureKVRevision() {
+ kvRevisionIndex := buildIndexDoc("delete_time")
+ kvRevisionIndex.Options = options.Index().SetExpireAfterSeconds(7 * 24 * 3600)
+ dmongo.EnsureCollection(model.CollectionKVRevision, nil, []mongo.IndexModel{kvRevisionIndex})
+}
+
+func ensureView() {
+ jsonSchema := bson.M{
+ "bsonType": "object",
+ "required": []string{"id", "domain", "project", "display"},
+ }
+ validator := bson.M{
+ "$jsonSchema": jsonSchema,
+ }
+ viewIDIndex := buildIndexDoc("id")
+ viewIDIndex.Options = options.Index().SetUnique(true)
+ viewMultipleIndex := buildIndexDoc("display", "domain", "project")
+ viewMultipleIndex.Options = options.Index().SetUnique(true)
+ dmongo.EnsureCollection(model.CollectionView, validator, []mongo.IndexModel{viewIDIndex, viewMultipleIndex})
+}
+
+func ensureKVLongPolling() {
+ jsonSchema := bson.M{
+ "bsonType": "object",
+ "required": []string{"id", "revision", "session_id", "url_path"},
+ }
+ validator := bson.M{
+ "$jsonSchema": jsonSchema,
+ }
+ timestampIndex := buildIndexDoc("timestamp")
+ timestampIndex.Options = options.Index().SetExpireAfterSeconds(7 * 24 * 3600)
+ kvLongPollingIndex := buildIndexDoc("revision", "domain", "session_id")
+ kvLongPollingIndex.Options = options.Index().SetUnique(true)
+ dmongo.EnsureCollection(model.CollectionPollingDetail, validator, []mongo.IndexModel{timestampIndex, kvLongPollingIndex})
+}
+
+func buildIndexDoc(keys ...string) mongo.IndexModel {
+ keysDoc := bsonx.Doc{}
+ for _, key := range keys {
+ keysDoc = keysDoc.Append(key, bsonx.Int32(1))
+ }
+ index := mongo.IndexModel{
+ Keys: keysDoc,
+ }
+ return index
+}
func init() {
datasource.RegisterPlugin("mongo", NewFrom)
}
diff --git a/server/datasource/mongo/kv/kv_dao.go b/server/datasource/mongo/kv/kv_dao.go
index d44dd5f..1338fae 100644
--- a/server/datasource/mongo/kv/kv_dao.go
+++ b/server/datasource/mongo/kv/kv_dao.go
@@ -23,6 +23,7 @@ import (
"regexp"
"strings"
+ dmongo "github.com/go-chassis/cari/db/mongo"
"github.com/go-chassis/cari/sync"
"github.com/go-chassis/openlog"
"go.mongodb.org/mongo-driver/bson"
@@ -32,7 +33,7 @@ import (
"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/pkg/util"
"github.com/apache/servicecomb-kie/server/datasource"
- "github.com/apache/servicecomb-kie/server/datasource/mongo/session"
+ mmodel "github.com/apache/servicecomb-kie/server/datasource/mongo/model"
)
const (
@@ -54,7 +55,7 @@ func (s *Dao) Create(ctx context.Context, kv *model.KVDoc, options ...datasource
}
func create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, error) {
- collection := session.GetDB().Collection(session.CollectionKV)
+ collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
_, err := collection.InsertOne(ctx, kv)
if err != nil {
openlog.Error("create error", openlog.WithTags(openlog.Tags{
@@ -68,7 +69,7 @@ func create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, error) {
// txnCreate is to start transaction when creating KV, will create task in a transaction operation
func txnCreate(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, error) {
- taskSession, err := session.GetDB().Client().StartSession()
+ taskSession, err := dmongo.GetClient().GetDB().Client().StartSession()
if err != nil {
return nil, err
}
@@ -77,7 +78,7 @@ func txnCreate(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, error) {
}
defer taskSession.EndSession(ctx)
if err = mongo.WithSession(ctx, taskSession, func(sessionContext mongo.SessionContext) error {
- collection := session.GetDB().Collection(session.CollectionKV)
+ collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
_, err = collection.InsertOne(sessionContext, kv)
if err != nil {
openlog.Error("create error", openlog.WithTags(openlog.Tags{
@@ -105,7 +106,7 @@ func txnCreate(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, error) {
}
return err
}
- collection = session.GetDB().Collection(session.CollectionTask)
+ collection = dmongo.GetClient().GetDB().Collection(mmodel.CollectionTask)
_, err = collection.InsertOne(sessionContext, task)
if err != nil {
openlog.Error("create task error", openlog.WithTags(openlog.Tags{
@@ -143,7 +144,7 @@ func (s *Dao) Update(ctx context.Context, kv *model.KVDoc, options ...datasource
}
func update(ctx context.Context, kv *model.KVDoc) error {
- collection := session.GetDB().Collection(session.CollectionKV)
+ collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
_, err := collection.UpdateOne(ctx, bson.M{"key": kv.Key, "label_format": kv.LabelFormat}, bson.D{
{Key: "$set", Value: bson.D{
{Key: "value", Value: kv.Value},
@@ -161,7 +162,7 @@ func update(ctx context.Context, kv *model.KVDoc) error {
// txnUpdate is to start transaction when updating kV, will create task in a transaction operation
func txnUpdate(ctx context.Context, kv *model.KVDoc) error {
- taskSession, err := session.GetDB().Client().StartSession()
+ taskSession, err := dmongo.GetClient().GetDB().Client().StartSession()
if err != nil {
return err
}
@@ -170,7 +171,7 @@ func txnUpdate(ctx context.Context, kv *model.KVDoc) error {
}
defer taskSession.EndSession(ctx)
if err = mongo.WithSession(ctx, taskSession, func(sessionContext mongo.SessionContext) error {
- collection := session.GetDB().Collection(session.CollectionKV)
+ collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
result := collection.FindOneAndUpdate(sessionContext, bson.M{"key": kv.Key, "label_format": kv.LabelFormat}, bson.D{
{Key: "$set", Value: bson.D{
{Key: "value", Value: kv.Value},
@@ -218,7 +219,7 @@ func txnUpdate(ctx context.Context, kv *model.KVDoc) error {
}
return err
}
- collection = session.GetDB().Collection(session.CollectionTask)
+ collection = dmongo.GetClient().GetDB().Collection(mmodel.CollectionTask)
_, err = collection.InsertOne(sessionContext, task)
if err != nil {
openlog.Error("create task error", openlog.WithTags(openlog.Tags{
@@ -253,7 +254,7 @@ func getValue(str string) string {
}
func findKV(ctx context.Context, domain string, project string, opts datasource.FindOptions) (*mongo.Cursor, int, error) {
- collection := session.GetDB().Collection(session.CollectionKV)
+ collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
ctx, cancel := context.WithTimeout(ctx, opts.Timeout)
defer cancel()
@@ -308,7 +309,7 @@ func findKV(ctx context.Context, domain string, project string, opts datasource.
return cur, int(curTotal), err
}
func findOneKey(ctx context.Context, filter bson.M) ([]*model.KVDoc, error) {
- collection := session.GetDB().Collection(session.CollectionKV)
+ collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
sr := collection.FindOne(ctx, filter)
if sr.Err() != nil {
if sr.Err() == mongo.ErrNoDocuments {
@@ -371,7 +372,7 @@ func (s *Dao) FindOneAndDelete(ctx context.Context, kvID, project, domain string
func findOneAndDelete(ctx context.Context, kvID, project, domain string) (*model.KVDoc, error) {
curKV := &model.KVDoc{}
- collection := session.GetDB().Collection(session.CollectionKV)
+ collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
sr := collection.FindOneAndDelete(ctx, bson.M{"id": kvID, "project": project, "domain": domain})
if sr.Err() != nil {
if sr.Err() == mongo.ErrNoDocuments {
@@ -390,7 +391,7 @@ func findOneAndDelete(ctx context.Context, kvID, project, domain string) (*model
// txnFindOneAndDelete is to start transaction when delete KV, will create task and tombstone in a transaction operation
func txnFindOneAndDelete(ctx context.Context, kvID, project, domain string) (*model.KVDoc, error) {
curKV := &model.KVDoc{}
- taskSession, err := session.GetDB().Client().StartSession()
+ taskSession, err := dmongo.GetClient().GetDB().Client().StartSession()
if err != nil {
openlog.Error("fail to start session" + err.Error())
return nil, err
@@ -401,7 +402,7 @@ func txnFindOneAndDelete(ctx context.Context, kvID, project, domain string) (*mo
}
defer taskSession.EndSession(ctx)
if err = mongo.WithSession(ctx, taskSession, func(sessionContext mongo.SessionContext) error {
- collection := session.GetDB().Collection(session.CollectionKV)
+ collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
sr := collection.FindOneAndDelete(sessionContext, bson.M{"id": kvID, "project": project, "domain": domain})
if sr.Err() != nil {
errAbort := taskSession.AbortTransaction(sessionContext)
@@ -441,7 +442,7 @@ func txnFindOneAndDelete(ctx context.Context, kvID, project, domain string) (*mo
}
return err
}
- collection = session.GetDB().Collection(session.CollectionTask)
+ collection = dmongo.GetClient().GetDB().Collection(mmodel.CollectionTask)
_, err = collection.InsertOne(sessionContext, task)
if err != nil {
openlog.Error("create task error", openlog.WithTags(openlog.Tags{
@@ -458,7 +459,7 @@ func txnFindOneAndDelete(ctx context.Context, kvID, project, domain string) (*mo
return err
}
tombstone := sync.NewTombstone(domain, project, datasource.ConfigResource, datasource.TombstoneID(curKV))
- collection = session.GetDB().Collection(session.CollectionTombstone)
+ collection = dmongo.GetClient().GetDB().Collection(mmodel.CollectionTombstone)
_, err = collection.InsertOne(sessionContext, tombstone)
if err != nil {
openlog.Error("create tombstone error", openlog.WithTags(openlog.Tags{
@@ -507,7 +508,7 @@ func findManyAndDelete(ctx context.Context, kvIDs []string, project, domain stri
}
return nil, 0, err
}
- collection := session.GetDB().Collection(session.CollectionKV)
+ collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
dr, err := collection.DeleteMany(ctx, filter)
if err != nil {
openlog.Error(fmt.Sprintf("delete kvs [%v] failed : [%v]", kvIDs, err))
@@ -530,7 +531,7 @@ func txnFindManyAndDelete(ctx context.Context, kvIDs []string, project, domain s
return nil, 0, err
}
var deletedCount int64
- taskSession, err := session.GetDB().Client().StartSession()
+ taskSession, err := dmongo.GetClient().GetDB().Client().StartSession()
if err != nil {
openlog.Error("fail to start session" + err.Error())
return nil, 0, err
@@ -542,7 +543,7 @@ func txnFindManyAndDelete(ctx context.Context, kvIDs []string, project, domain s
defer taskSession.EndSession(ctx)
if err = mongo.WithSession(ctx, taskSession, func(sessionContext mongo.SessionContext) error {
- collection := session.GetDB().Collection(session.CollectionKV)
+ collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
filter := bson.D{
{Key: "id", Value: bson.M{"$in": kvIDs}},
{Key: "project", Value: project},
@@ -569,7 +570,7 @@ func txnFindManyAndDelete(ctx context.Context, kvIDs []string, project, domain s
tasksDoc[i] = task
tombstonesDoc[i] = tombstone
}
- collection = session.GetDB().Collection(session.CollectionTask)
+ collection = dmongo.GetClient().GetDB().Collection(mmodel.CollectionTask)
_, err = collection.InsertMany(sessionContext, tasksDoc)
if err != nil {
openlog.Error("create tasks error", openlog.WithTags(openlog.Tags{
@@ -583,7 +584,7 @@ func txnFindManyAndDelete(ctx context.Context, kvIDs []string, project, domain s
}
return err
}
- collection = session.GetDB().Collection(session.CollectionTombstone)
+ collection = dmongo.GetClient().GetDB().Collection(mmodel.CollectionTombstone)
_, err = collection.InsertMany(sessionContext, tombstonesDoc)
if err != nil {
openlog.Error("create tombstone error", openlog.WithTags(openlog.Tags{
@@ -609,7 +610,7 @@ func txnFindManyAndDelete(ctx context.Context, kvIDs []string, project, domain s
}
func findKeys(ctx context.Context, filter interface{}, withoutLabel bool) ([]*model.KVDoc, error) {
- collection := session.GetDB().Collection(session.CollectionKV)
+ collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
cur, err := collection.Find(ctx, filter)
if err != nil {
if err.Error() == context.DeadlineExceeded.Error() {
@@ -666,7 +667,7 @@ func (s *Dao) Get(ctx context.Context, req *model.GetKVRequest) (*model.KVDoc, e
}
func (s *Dao) Total(ctx context.Context, project, domain string) (int64, error) {
- collection := session.GetDB().Collection(session.CollectionKV)
+ collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
filter := bson.M{"domain": domain, "project": project}
total, err := collection.CountDocuments(ctx, filter)
if err != nil {
diff --git a/server/datasource/mongo/model/model.go b/server/datasource/mongo/model/model.go
new file mode 100644
index 0000000..4110e7b
--- /dev/null
+++ b/server/datasource/mongo/model/model.go
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package model
+
+const (
+ CollectionKV = "kv"
+ CollectionKVRevision = "kv_revision"
+ CollectionPollingDetail = "polling_detail"
+ CollectionCounter = "counter"
+ CollectionView = "view"
+ CollectionTask = "task"
+ CollectionTombstone = "tombstone"
+)
diff --git a/server/datasource/mongo/session/session.go b/server/datasource/mongo/session/session.go
deleted file mode 100644
index e501b92..0000000
--- a/server/datasource/mongo/session/session.go
+++ /dev/null
@@ -1,312 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-//Package session manage db connection
-package session
-
-import (
- "context"
- "crypto/tls"
- "errors"
- "reflect"
- "strings"
- "sync"
- "time"
-
- "github.com/go-chassis/openlog"
- "go.mongodb.org/mongo-driver/bson"
- "go.mongodb.org/mongo-driver/bson/bsoncodec"
- "go.mongodb.org/mongo-driver/mongo"
- "go.mongodb.org/mongo-driver/mongo/options"
- "gopkg.in/mgo.v2"
-
- "github.com/apache/servicecomb-kie/pkg/cipherutil"
- "github.com/apache/servicecomb-kie/pkg/model"
- "github.com/apache/servicecomb-kie/server/datasource"
- "github.com/apache/servicecomb-kie/server/datasource/tlsutil"
-)
-
-//const for db name and collection name
-const (
- DBName = "servicecomb"
-
- CollectionLabel = "label"
- CollectionKV = "kv"
- CollectionKVRevision = "kv_revision"
- CollectionPollingDetail = "polling_detail"
- CollectionCounter = "counter"
- CollectionView = "view"
- CollectionTask = "task"
- CollectionTombstone = "tombstone"
-)
-
-//db errors
-var (
- ErrMissingDomain = errors.New("domain info missing, illegal access")
- ErrMissingProject = errors.New("project info missing, illegal access")
- ErrLabelNotExists = errors.New("labels does not exits")
-
- ErrKeyMustNotEmpty = errors.New("must supply key if you want to get exact one result")
-
- ErrIDIsNil = errors.New("id is empty")
- ErrKeyIsNil = errors.New("key must not be empty")
-
- ErrViewCreation = errors.New("can not create view")
- ErrViewUpdate = errors.New("can not update view")
- ErrViewDelete = errors.New("can not delete view")
- ErrViewNotExist = errors.New("view not exists")
- ErrViewFinding = errors.New("view search error")
- ErrGetPipeline = errors.New("can not get criteria")
-)
-
-const (
- MsgDBExists = "already exists"
- MsgDuplicate = "duplicate key error collection"
-)
-
-var client *mongo.Client
-var once sync.Once
-var db *mongo.Database
-
-//Init prepare params
-func Init(c *datasource.Config) error {
- var err error
- once.Do(func() {
- sc, _ := bsoncodec.NewStructCodec(bsoncodec.DefaultStructTagParser)
- reg := bson.NewRegistryBuilder().
- RegisterTypeEncoder(reflect.TypeOf(model.LabelDoc{}), sc).
- RegisterTypeEncoder(reflect.TypeOf(model.KVDoc{}), sc).
- Build()
- uri := cipherutil.TryDecrypt(c.URI)
- clientOps := []*options.ClientOptions{options.Client().ApplyURI(uri)}
- if c.SSLEnabled {
- var tc *tls.Config
- tc, err = tlsutil.Config(c)
- if err != nil {
- return
- }
- clientOps = append(clientOps, options.Client().SetTLSConfig(tc))
- openlog.Info("enabled ssl communication to mongodb")
- }
- client, err = mongo.NewClient(clientOps...)
- if err != nil {
- return
- }
- openlog.Info("DB connecting")
-
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
-
- err = client.Connect(ctx)
- if err != nil {
- return
- }
- openlog.Info("DB connected")
- db = client.Database(DBName, &options.DatabaseOptions{
- Registry: reg,
- })
-
- })
- EnsureDB(c)
- return nil
-}
-
-//GetDB get mongo db client
-func GetDB() *mongo.Database {
- return db
-}
-
-//CreateView run mongo db command to create view
-func CreateView(ctx context.Context, view, source string, pipeline mongo.Pipeline) error {
- sr := GetDB().RunCommand(ctx,
- bson.D{
- {Key: "create", Value: view},
- {Key: "viewOn", Value: source},
- {Key: "pipeline", Value: pipeline},
- })
- if sr.Err() != nil {
- openlog.Error("can not create view: " + sr.Err().Error())
- return ErrViewCreation
- }
- return nil
-}
-
-//DropView deletes view
-func DropView(ctx context.Context, view string) error {
- err := GetDB().Collection(view).Drop(ctx)
- if err != nil {
- openlog.Error(err.Error())
- return err
- }
- return nil
-}
-
-//GetColInfo get collection info
-func GetColInfo(ctx context.Context, name string) (*CollectionInfo, error) {
- cur, err := GetDB().ListCollections(ctx, bson.M{"name": name, "type": "view"})
- if err != nil {
- openlog.Error(err.Error())
- return nil, ErrGetPipeline
- }
- defer cur.Close(ctx)
- if !cur.Next(ctx) {
- return nil, ErrGetPipeline
- }
- openlog.Debug(cur.Current.String())
- c := &CollectionInfo{}
- if err := cur.Decode(c); err != nil {
- openlog.Error(err.Error())
- return nil, ErrGetPipeline
- }
- return c, nil
-}
-
-//EnsureDB build mongo db schema
-func EnsureDB(c *datasource.Config) {
- session := OpenSession(c)
- defer session.Close()
- session.SetMode(mgo.Primary, true)
-
- ensureRevisionCounter(session)
-
- ensureKV(session)
-
- ensureKVRevision(session)
-
- ensureView(session)
-
- ensureKVLongPolling(session)
-
-}
-
-func OpenSession(c *datasource.Config) *mgo.Session {
- timeout := c.Timeout
- var uri string
- var err error
- uri = cipherutil.TryDecrypt(c.URI)
- session, err := mgo.DialWithTimeout(uri, timeout)
- if err != nil {
- openlog.Warn("can not dial db, retry once:" + err.Error())
- session, err = mgo.DialWithTimeout(uri, timeout)
- if err != nil {
- openlog.Fatal("can not dial db:" + err.Error())
- }
- }
- return session
-}
-
-func ensureKVLongPolling(session *mgo.Session) {
- c := session.DB(DBName).C(CollectionPollingDetail)
- err := c.Create(&mgo.CollectionInfo{Validator: bson.M{
- "id": bson.M{"$exists": true},
- "revision": bson.M{"$exists": true},
- "session_id": bson.M{"$exists": true},
- "url_path": bson.M{"$exists": true},
- }})
- wrapError(err, MsgDBExists)
- err = c.EnsureIndex(mgo.Index{
- Key: []string{"timestamp"},
- ExpireAfter: 7 * 24 * time.Hour,
- })
- wrapError(err)
- err = c.EnsureIndex(mgo.Index{
- Key: []string{"revision", "domain", "session_id"},
- Unique: true,
- })
- wrapError(err)
-}
-
-func ensureView(session *mgo.Session) {
- c := session.DB(DBName).C(CollectionView)
- err := c.Create(&mgo.CollectionInfo{Validator: bson.M{
- "id": bson.M{"$exists": true},
- "domain": bson.M{"$exists": true},
- "project": bson.M{"$exists": true},
- "display": bson.M{"$exists": true},
- }})
- wrapError(err, MsgDBExists)
- err = c.EnsureIndex(mgo.Index{
- Key: []string{"id"},
- Unique: true,
- })
- wrapError(err)
- err = c.EnsureIndex(mgo.Index{
- Key: []string{"display", "domain", "project"},
- Unique: true,
- })
- wrapError(err)
-}
-
-func ensureKVRevision(session *mgo.Session) {
- c := session.DB(DBName).C(CollectionKVRevision)
- err := c.EnsureIndex(mgo.Index{
- Key: []string{"delete_time"},
- ExpireAfter: 7 * 24 * time.Hour,
- })
- wrapError(err, MsgDBExists)
-}
-
-func ensureKV(session *mgo.Session) {
- c := session.DB(DBName).C(CollectionKV)
- err := c.Create(&mgo.CollectionInfo{Validator: bson.M{
- "key": bson.M{"$exists": true},
- "domain": bson.M{"$exists": true},
- "project": bson.M{"$exists": true},
- "id": bson.M{"$exists": true},
- }})
- wrapError(err, MsgDBExists)
- err = c.EnsureIndex(mgo.Index{
- Key: []string{"id"},
- Unique: true,
- })
- wrapError(err)
- err = c.EnsureIndex(mgo.Index{
- Key: []string{"key", "label_format", "domain", "project"},
- Unique: true,
- })
- wrapError(err)
-}
-
-func ensureRevisionCounter(session *mgo.Session) {
- c := session.DB(DBName).C(CollectionCounter)
- err := c.Create(&mgo.CollectionInfo{Validator: bson.M{
- "name": bson.M{"$exists": true},
- "domain": bson.M{"$exists": true},
- "count": bson.M{"$exists": true},
- }})
- wrapError(err, MsgDBExists)
- err = c.EnsureIndex(mgo.Index{
- Key: []string{"name", "domain"},
- Unique: true,
- })
- wrapError(err)
- docs := map[string]interface{}{"name": "revision_counter", "count": 1, "domain": "default"}
- err = c.Insert(docs)
- wrapError(err, MsgDuplicate)
-}
-
-func wrapError(err error, skipMsg ...string) {
- if err != nil {
- for _, str := range skipMsg {
- if strings.Contains(err.Error(), str) {
- openlog.Debug(err.Error())
- return
- }
- }
- openlog.Fatal(err.Error())
- }
-}
diff --git a/server/datasource/mongo/session/session_test.go b/server/datasource/mongo/session/session_test.go
deleted file mode 100644
index c5eaee1..0000000
--- a/server/datasource/mongo/session/session_test.go
+++ /dev/null
@@ -1,34 +0,0 @@
-package session_test
-
-import (
- "context"
- "testing"
- "time"
-
- "github.com/apache/servicecomb-kie/server/datasource"
- "github.com/apache/servicecomb-kie/server/datasource/mongo/session"
- _ "github.com/apache/servicecomb-kie/test"
- "github.com/stretchr/testify/assert"
- "go.mongodb.org/mongo-driver/bson"
-)
-
-func TestGetColInfo(t *testing.T) {
- var err error
- err = session.Init(&datasource.Config{
- URI: "mongodb://kie:123@127.0.0.1:27017/servicecomb",
- Timeout: 10 * time.Second,
- })
- assert.NoError(t, err)
- err = session.CreateView(context.Background(), "test_view", session.CollectionKV, []bson.D{
- {{
- "$match",
- bson.D{{"domain", "default"}, {"project", "default"}},
- }},
- })
- assert.NoError(t, err)
- c, err := session.GetColInfo(context.Background(), "test_view")
- assert.NoError(t, err)
- assert.Equal(t, "default", c.Options.Pipeline[0]["$match"]["domain"])
- err = session.DropView(context.Background(), "test_view")
- assert.NoError(t, err)
-}
diff --git a/server/datasource/mongo/session/struct.go b/server/datasource/mongo/session/struct.go
deleted file mode 100644
index 44a5c62..0000000
--- a/server/datasource/mongo/session/struct.go
+++ /dev/null
@@ -1,11 +0,0 @@
-package session
-
-//CollectionInfo is struct
-type CollectionInfo struct {
- Options Options `json:"options"`
-}
-
-//Options is struct
-type Options struct {
- Pipeline []map[string]map[string]string `json:"pipeline"`
-}
diff --git a/server/datasource/mongo/track/polling_detail_dao.go b/server/datasource/mongo/track/polling_detail_dao.go
index 5c51e67..27ceae8 100644
--- a/server/datasource/mongo/track/polling_detail_dao.go
+++ b/server/datasource/mongo/track/polling_detail_dao.go
@@ -20,13 +20,15 @@ package track
import (
"context"
- "github.com/apache/servicecomb-kie/pkg/model"
- "github.com/apache/servicecomb-kie/server/datasource"
- "github.com/apache/servicecomb-kie/server/datasource/mongo/session"
+ dmongo "github.com/go-chassis/cari/db/mongo"
"github.com/go-chassis/openlog"
"github.com/gofrs/uuid"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
+
+ "github.com/apache/servicecomb-kie/pkg/model"
+ "github.com/apache/servicecomb-kie/server/datasource"
+ mmodel "github.com/apache/servicecomb-kie/server/datasource/mongo/model"
)
//Dao is the implementation
@@ -36,7 +38,7 @@ type Dao struct {
//CreateOrUpdate create a record or update exist record
//If revision and session_id exists then update else insert
func (s *Dao) CreateOrUpdate(ctx context.Context, detail *model.PollingDetail) (*model.PollingDetail, error) {
- collection := session.GetDB().Collection(session.CollectionPollingDetail)
+ collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionPollingDetail)
queryFilter := bson.M{"revision": detail.Revision, "domain": detail.Domain, "session_id": detail.SessionID}
res := collection.FindOne(ctx, queryFilter)
if res.Err() != nil {
@@ -63,7 +65,7 @@ func (s *Dao) CreateOrUpdate(ctx context.Context, detail *model.PollingDetail) (
//Get is to get a track data
func (s *Dao) GetPollingDetail(ctx context.Context, detail *model.PollingDetail) ([]*model.PollingDetail, error) {
- collection := session.GetDB().Collection(session.CollectionPollingDetail)
+ collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionPollingDetail)
queryFilter := bson.M{"domain": detail.Domain}
if detail.SessionID != "" {
queryFilter["session_id"] = detail.SessionID
diff --git a/server/resource/v1/common.go b/server/resource/v1/common.go
index 225abbc..4baff07 100644
--- a/server/resource/v1/common.go
+++ b/server/resource/v1/common.go
@@ -35,7 +35,6 @@ import (
"github.com/apache/servicecomb-kie/pkg/common"
"github.com/apache/servicecomb-kie/pkg/model"
- "github.com/apache/servicecomb-kie/server/datasource/mongo/session"
"github.com/apache/servicecomb-kie/server/pubsub"
goRestful "github.com/emicklei/go-restful"
"github.com/go-chassis/cari/config"
@@ -69,6 +68,10 @@ func NewObserver() (*pubsub.Observer, error) {
//err
var (
ErrInvalidRev = errors.New(common.MsgInvalidRev)
+
+ ErrMissingDomain = errors.New("domain info missing, illegal access")
+ ErrMissingProject = errors.New("project info missing, illegal access")
+ ErrIDIsNil = errors.New("id is empty")
)
//ReadClaims get auth info
@@ -242,7 +245,7 @@ func checkPagination(offsetStr, limitStr string) (int64, int64, error) {
func validateGet(domain, project, kvID string) error {
if kvID == "" {
- return session.ErrIDIsNil
+ return ErrIDIsNil
}
return checkDomainAndProject(domain, project)
}
@@ -257,10 +260,10 @@ func validateDeleteList(domain, project string) error {
func checkDomainAndProject(domain, project string) error {
if domain == "" {
- return session.ErrMissingDomain
+ return ErrMissingDomain
}
if project == "" {
- return session.ErrMissingProject
+ return ErrMissingProject
}
return nil
}
diff --git a/test/init.go b/test/init.go
index 7732267..489fd19 100644
--- a/test/init.go
+++ b/test/init.go
@@ -18,23 +18,21 @@
package test
import (
- "time"
+ "github.com/go-chassis/go-archaius"
+ "github.com/go-chassis/go-chassis/v2/security/cipher"
"github.com/apache/servicecomb-kie/pkg/validator"
"github.com/apache/servicecomb-kie/server/config"
"github.com/apache/servicecomb-kie/server/datasource"
edatasource "github.com/apache/servicecomb-service-center/eventbase/datasource"
- "github.com/go-chassis/cari/db"
- "github.com/go-chassis/go-archaius"
- "github.com/go-chassis/go-chassis/v2/security/cipher"
+
+ _ "github.com/go-chassis/cari/db/bootstrap"
_ "github.com/apache/servicecomb-kie/server/datasource/etcd"
_ "github.com/apache/servicecomb-kie/server/datasource/mongo"
_ "github.com/apache/servicecomb-kie/server/pubsub/notifier"
_ "github.com/apache/servicecomb-service-center/eventbase/bootstrap"
_ "github.com/go-chassis/go-chassis/v2/security/cipher/plugins/plain"
- _ "github.com/little-cui/etcdadpt/embedded"
- _ "github.com/little-cui/etcdadpt/remote"
)
var (
@@ -75,16 +73,9 @@ func init() {
if err != nil {
panic(err)
}
- if kind != "embedded_etcd" {
- eventbaseDBCfg := db.Config{
- URI: uri,
- Kind: kind,
- Timeout: 10 * time.Second,
- }
- err = edatasource.Init(eventbaseDBCfg)
- if err != nil {
- panic(err)
- }
+ err = edatasource.Init(kind)
+ if err != nil {
+ panic(err)
}
}