You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ro...@apache.org on 2021/11/20 10:12:04 UTC
[servicecomb-kie] 01/01: add config sync task
This is an automated email from the ASF dual-hosted git repository.
robotljw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-kie.git
commit defe7306ff15b9de7256889286ef8daf1fae8d92
Author: robotljw <79...@qq.com>
AuthorDate: Fri Nov 19 20:46:44 2021 +0800
add config sync task
---
examples/dev/kie-conf.yaml | 2 +
pkg/model/db_schema.go | 9 +++
server/config/struct.go | 1 +
server/datasource/dao.go | 23 ++++++--
server/datasource/etcd/init.go | 1 +
server/datasource/etcd/key/key.go | 10 ++++
server/datasource/etcd/kv/kv_dao.go | 88 ++++++++++++++++++++++++++++++
server/datasource/mongo/init.go | 1 +
server/datasource/mongo/kv/kv_dao.go | 58 ++++++++++++++++++++
server/datasource/mongo/session/session.go | 1 +
server/service/kv/kv_svc.go | 44 ++++++++++++---
11 files changed, 225 insertions(+), 13 deletions(-)
diff --git a/examples/dev/kie-conf.yaml b/examples/dev/kie-conf.yaml
index 7d10d4a..ffd38b1 100644
--- a/examples/dev/kie-conf.yaml
+++ b/examples/dev/kie-conf.yaml
@@ -7,6 +7,8 @@ db:
# kind=embedded_etcd, then is the embedded etcd server's advertise-peer-urls, e.g. default=http://127.0.0.1:2380
#uri: mongodb://kie:123@127.0.0.1:27017/kie
uri: http://127.0.0.1:2379
+ # turn on the synchronization switch related operations will be written to the task in the db
+ syncEnabled: false
# poolSize: 10
# timeout: 5m
# sslEnabled: false
diff --git a/pkg/model/db_schema.go b/pkg/model/db_schema.go
index 0434dcd..acfe42b 100644
--- a/pkg/model/db_schema.go
+++ b/pkg/model/db_schema.go
@@ -48,6 +48,15 @@ type KVDoc struct {
Domain string `json:"domain,omitempty" yaml:"domain,omitempty" validate:"min=1,max=256,commonName"` //redundant
}
+// Task is db struct to store sync task
+type Task struct {
+ Action string `json:"action" bson:"action"`
+ DataType string `json:"data_type" bson:"data_type"`
+ Data interface{} `json:"data" bson:"data"`
+ Timestamp string `json:"timestamp" bson:"timestamp"`
+ Status string `json:"status" bson:"status"`
+}
+
//ViewDoc is db struct, it saves user's custom view name and criteria
type ViewDoc struct {
ID string `json:"id,omitempty" bson:"id,omitempty" yaml:"id,omitempty" swag:"string"`
diff --git a/server/config/struct.go b/server/config/struct.go
index 357a84b..d19d2b1 100644
--- a/server/config/struct.go
+++ b/server/config/struct.go
@@ -41,6 +41,7 @@ type DB struct {
CertPwdFile string `yaml:"certPwdFile"`
Timeout string `yaml:"timeout"`
VerifyPeer bool `yaml:"verifyPeer"`
+ SyncEnable bool `yaml:"syncEnabled"`
}
//RBAC is rbac config
diff --git a/server/datasource/dao.go b/server/datasource/dao.go
index 3ec3e74..6607131 100644
--- a/server/datasource/dao.go
+++ b/server/datasource/dao.go
@@ -35,12 +35,13 @@ var (
)
var (
- ErrKeyNotExists = errors.New("can not find any key value")
- ErrRecordNotExists = errors.New("can not find any polling data")
- ErrRevisionNotExist = errors.New("revision does not exist")
- ErrAliasNotGiven = errors.New("label alias not given")
- ErrKVAlreadyExists = errors.New("kv already exists")
- ErrTooMany = errors.New("key with labels should be only one")
+ ErrKeyNotExists = errors.New("can not find any key value")
+ ErrRecordNotExists = errors.New("can not find any polling data")
+ ErrRevisionNotExist = errors.New("revision does not exist")
+ ErrAliasNotGiven = errors.New("label alias not given")
+ ErrKVAlreadyExists = errors.New("kv already exists")
+ ErrKVTaskAlreadyExists = errors.New("kv or sync task already exists")
+ ErrTooMany = errors.New("key with labels should be only one")
)
const (
@@ -73,6 +74,10 @@ type KVDao interface {
Create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, error)
Update(ctx context.Context, kv *model.KVDoc) error
List(ctx context.Context, project, domain string, options ...FindOption) (*model.KVResponse, error)
+
+ CreateWithTask(ctx context.Context, kv *model.KVDoc, task *model.Task) (*model.KVDoc, error)
+ UpdateWithTask(ctx context.Context, kv *model.KVDoc, task *model.Task) error
+
//FindOneAndDelete deletes one kv by id and return the deleted kv as these appeared before deletion
FindOneAndDelete(ctx context.Context, kvID string, project, domain string) (*model.KVDoc, error)
//FindManyAndDelete deletes multiple kvs and return the deleted kv list as these appeared before deletion
@@ -84,6 +89,12 @@ type KVDao interface {
Total(ctx context.Context, project, domain string) (int64, error)
}
+// TaskDao provide api of Task entity
+type TaskDao interface {
+ Create(ctx context.Context, task *model.Task, domain string, project string) (*model.Task, error)
+ Update(ctx context.Context, task *model.Task, domain string, project string) error
+}
+
//HistoryDao provide api of History entity
type HistoryDao interface {
AddHistory(ctx context.Context, kv *model.KVDoc) error
diff --git a/server/datasource/etcd/init.go b/server/datasource/etcd/init.go
index fdd7596..622a314 100644
--- a/server/datasource/etcd/init.go
+++ b/server/datasource/etcd/init.go
@@ -69,6 +69,7 @@ func (*Broker) GetHistoryDao() datasource.HistoryDao {
func (*Broker) GetTrackDao() datasource.TrackDao {
return &track.Dao{}
}
+
func init() {
datasource.RegisterPlugin("etcd", NewFrom)
datasource.RegisterPlugin("embedded_etcd", NewFrom)
diff --git a/server/datasource/etcd/key/key.go b/server/datasource/etcd/key/key.go
index 1df4665..bd7ac15 100644
--- a/server/datasource/etcd/key/key.go
+++ b/server/datasource/etcd/key/key.go
@@ -28,8 +28,18 @@ const (
keyCounter = "counter"
keyHistory = "kv-history"
keyTrack = "track"
+ sync = "sync"
+ task = "task"
)
+func getSyncRootKey() string {
+ return split + sync
+}
+
+func TaskKey(domain, project, timestamp string) string {
+ return strings.Join([]string{getSyncRootKey(), task, domain, project, timestamp}, split)
+}
+
func KV(domain, project, kvID string) string {
return strings.Join([]string{keyKV, domain, project, kvID}, split)
}
diff --git a/server/datasource/etcd/kv/kv_dao.go b/server/datasource/etcd/kv/kv_dao.go
index d3f02b0..a52a37f 100644
--- a/server/datasource/etcd/kv/kv_dao.go
+++ b/server/datasource/etcd/kv/kv_dao.go
@@ -20,6 +20,7 @@ package kv
import (
"context"
"encoding/json"
+ "fmt"
"regexp"
"strings"
@@ -59,6 +60,93 @@ func (s *Dao) Create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, error)
return kv, nil
}
+// CreateWithTask is used to create with the task after synchronization is turned on
+func (s *Dao) CreateWithTask(ctx context.Context, kv *model.KVDoc, task *model.Task) (*model.KVDoc, error) {
+ kvBytes, err := json.Marshal(kv)
+ if err != nil {
+ openlog.Error("fail to marshal kv")
+ return nil, err
+ }
+ taskBytes, err := json.Marshal(task)
+ if err != nil {
+ openlog.Error("fail to marshal task ")
+ return nil, err
+ }
+ kvOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.KV(kv.Domain, kv.Project, kv.ID)), etcdadpt.WithValue(kvBytes))
+ taskOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.TaskKey(kv.Domain, kv.Project, task.Timestamp)), etcdadpt.WithValue(taskBytes))
+ kvOpCmp := etcdadpt.OpCmp(etcdadpt.CmpCreateRev(kvOpPut.Key), etcdadpt.CmpEqual, 0)
+ taskOpCmp := etcdadpt.OpCmp(etcdadpt.CmpCreateRev(taskOpPut.Key), etcdadpt.CmpEqual, 0)
+ resp, err := etcdadpt.Instance().TxnWithCmp(ctx, []etcdadpt.OpOptions{kvOpPut, taskOpPut}, []etcdadpt.CmpOptions{kvOpCmp, taskOpCmp}, nil)
+ if err != nil {
+ openlog.Error("create error", openlog.WithTags(openlog.Tags{
+ "err": err.Error(),
+ "kv": kv,
+ "task": task,
+ }))
+ return nil, err
+ }
+ if !resp.Succeeded {
+ openlog.Error("create error", openlog.WithTags(openlog.Tags{
+ "err": datasource.ErrKVTaskAlreadyExists.Error(),
+ "kv": kv,
+ "task": task,
+ }))
+ return nil, datasource.ErrKVTaskAlreadyExists
+ }
+ return kv, nil
+}
+
+func (s *Dao) UpdateWithTask(ctx context.Context, kv *model.KVDoc, task *model.Task) error {
+ keyKv := key.KV(kv.Domain, kv.Project, kv.ID)
+ resp, err := etcdadpt.Get(ctx, keyKv)
+ if err != nil {
+ openlog.Error(err.Error())
+ return err
+ }
+ if resp == nil {
+ return datasource.ErrRecordNotExists
+ }
+
+ var old model.KVDoc
+ err = json.Unmarshal(resp.Value, &old)
+ if err != nil {
+ openlog.Error(err.Error())
+ return err
+ }
+ old.LabelFormat = kv.LabelFormat
+ old.Value = kv.Value
+ old.Status = kv.Status
+ old.Checker = kv.Checker
+ old.UpdateTime = kv.UpdateTime
+ old.UpdateRevision = kv.UpdateRevision
+
+ kvBytes, err := json.Marshal(old)
+ if err != nil {
+ openlog.Error(err.Error())
+ return err
+ }
+
+ taskBytes, err := json.Marshal(task)
+ if err != nil {
+ openlog.Error(err.Error())
+ return err
+ }
+
+ kvOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(keyKv), etcdadpt.WithValue(kvBytes))
+ taskOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.TaskKey(kv.Domain, kv.Project, task.Timestamp)), etcdadpt.WithValue(taskBytes))
+ kvOpCmp := etcdadpt.OpCmp(etcdadpt.CmpCreateRev(kvOpPut.Key), etcdadpt.CmpEqual, 0)
+ taskOpCmp := etcdadpt.OpCmp(etcdadpt.CmpCreateRev(taskOpPut.Key), etcdadpt.CmpEqual, 0)
+ response, err := etcdadpt.Instance().TxnWithCmp(ctx, []etcdadpt.OpOptions{kvOpPut, taskOpPut}, []etcdadpt.CmpOptions{kvOpCmp, taskOpCmp}, nil)
+ openlog.Info("44444444444444444444")
+ openlog.Info(fmt.Sprintf("%v",response))
+ //err = etcdadpt.PutBytes(ctx, keyKv, bytes)
+ if err != nil {
+ openlog.Error(err.Error())
+ return err
+ }
+ return nil
+}
+
//Update update key value
func (s *Dao) Update(ctx context.Context, kv *model.KVDoc) error {
keyKv := key.KV(kv.Domain, kv.Project, kv.ID)
diff --git a/server/datasource/mongo/init.go b/server/datasource/mongo/init.go
index 230bc8e..2b4ae33 100644
--- a/server/datasource/mongo/init.go
+++ b/server/datasource/mongo/init.go
@@ -46,6 +46,7 @@ func (*Broker) GetHistoryDao() datasource.HistoryDao {
func (*Broker) GetTrackDao() datasource.TrackDao {
return &track.Dao{}
}
+
func init() {
datasource.RegisterPlugin("mongo", NewFrom)
}
diff --git a/server/datasource/mongo/kv/kv_dao.go b/server/datasource/mongo/kv/kv_dao.go
index 22d832e..9a13813 100644
--- a/server/datasource/mongo/kv/kv_dao.go
+++ b/server/datasource/mongo/kv/kv_dao.go
@@ -57,6 +57,64 @@ func (s *Dao) Create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, error)
return kv, nil
}
+func (s *Dao) CreateWithTask(ctx context.Context, kv *model.KVDoc, task *model.Task) (*model.KVDoc, error) {
+ taskSession, err := session.GetDB().Client().StartSession()
+ if err != nil {
+ return nil, err
+ }
+ if err = taskSession.StartTransaction(); err != nil {
+ return nil, err
+ }
+ defer taskSession.EndSession(ctx)
+ if err = mongo.WithSession(ctx, taskSession, func(sessionContext mongo.SessionContext) error {
+ collection := session.GetDB().Collection(session.CollectionKV)
+ _, err = collection.InsertOne(sessionContext, kv)
+ if err != nil {
+ openlog.Error("create error", openlog.WithTags(openlog.Tags{
+ "err": err.Error(),
+ "kv": kv,
+ }))
+ errAbort := taskSession.AbortTransaction(sessionContext)
+ if errAbort != nil {
+ openlog.Error("fail to abort transaction", openlog.WithTags(openlog.Tags{
+ "err": errAbort.Error(),
+ "kv": kv,
+ }))
+ }
+ return err
+ }
+
+ collection = session.GetDB().Collection(session.CollectionTask)
+ _, err = collection.InsertOne(sessionContext, task)
+ if err != nil {
+ openlog.Error("create error", openlog.WithTags(openlog.Tags{
+ "err": err.Error(),
+ "task": task,
+ }))
+ errAbort := taskSession.AbortTransaction(sessionContext)
+ if errAbort != nil {
+ openlog.Error("fail to abort transaction", openlog.WithTags(openlog.Tags{
+ "err": errAbort.Error(),
+ "task": task,
+ }))
+ }
+ return err
+ }
+ if err = taskSession.CommitTransaction(sessionContext); err != nil {
+ return err
+ }
+ return nil
+ }); err != nil {
+ openlog.Error(err.Error())
+ return nil, err
+ }
+ return kv, nil
+}
+
+func (s *Dao) UpdateWithTask(ctx context.Context, kv *model.KVDoc, task *model.Task) error {
+ return nil
+}
+
//Update update key value
func (s *Dao) Update(ctx context.Context, kv *model.KVDoc) error {
collection := session.GetDB().Collection(session.CollectionKV)
diff --git a/server/datasource/mongo/session/session.go b/server/datasource/mongo/session/session.go
index 94a4ce9..75c5bd3 100644
--- a/server/datasource/mongo/session/session.go
+++ b/server/datasource/mongo/session/session.go
@@ -49,6 +49,7 @@ const (
CollectionPollingDetail = "polling_detail"
CollectionCounter = "counter"
CollectionView = "view"
+ CollectionTask = "task"
)
//db errors
diff --git a/server/service/kv/kv_svc.go b/server/service/kv/kv_svc.go
index 3ed06ec..3a573d2 100644
--- a/server/service/kv/kv_svc.go
+++ b/server/service/kv/kv_svc.go
@@ -20,12 +20,14 @@ package kv
import (
"context"
"fmt"
+ "strconv"
"time"
"github.com/apache/servicecomb-kie/pkg/common"
"github.com/apache/servicecomb-kie/pkg/concurrency"
"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/pkg/stringutil"
+ cfg "github.com/apache/servicecomb-kie/server/config"
"github.com/apache/servicecomb-kie/server/datasource"
"github.com/apache/servicecomb-kie/server/pubsub"
"github.com/go-chassis/cari/config"
@@ -111,10 +113,25 @@ func Create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, *errsvc.Error)
openlog.Error(err.Error())
return nil, config.NewError(config.ErrInternal, "create kv failed")
}
- kv, err = datasource.GetBroker().GetKVDao().Create(ctx, kv)
- if err != nil {
- openlog.Error(fmt.Sprintf("post err:%s", err.Error()))
- return nil, config.NewError(config.ErrInternal, "create kv failed")
+ // open synchronization needs to write tasks to db
+ if cfg.GetDB().SyncEnable {
+ task := &model.Task{
+ Action: "create",
+ DataType: "config",
+ Data: kv,
+ Timestamp: strconv.FormatInt(time.Now().Unix(), 10),
+ }
+ kv, err = datasource.GetBroker().GetKVDao().CreateWithTask(ctx, kv, task)
+ if err != nil {
+ openlog.Error(fmt.Sprintf("post err:%s", err.Error()))
+ return nil, config.NewError(config.ErrInternal, "create kv failed")
+ }
+ } else {
+ kv, err = datasource.GetBroker().GetKVDao().Create(ctx, kv)
+ if err != nil {
+ openlog.Error(fmt.Sprintf("post err:%s", err.Error()))
+ return nil, config.NewError(config.ErrInternal, "create kv failed")
+ }
}
err = datasource.GetBroker().GetHistoryDao().AddHistory(ctx, kv)
if err != nil {
@@ -228,9 +245,22 @@ func Update(ctx context.Context, kv *model.UpdateKVRequest) (*model.KVDoc, error
if err != nil {
return nil, err
}
- err = datasource.GetBroker().GetKVDao().Update(ctx, oldKV)
- if err != nil {
- return nil, err
+ if cfg.GetDB().SyncEnable {
+ task := &model.Task{
+ Action: "update",
+ DataType: "config",
+ Data: kv,
+ Timestamp: strconv.FormatInt(time.Now().Unix(), 10),
+ }
+ err = datasource.GetBroker().GetKVDao().UpdateWithTask(ctx, oldKV, task)
+ if err != nil {
+ return nil, err
+ }
+ } else {
+ err = datasource.GetBroker().GetKVDao().Update(ctx, oldKV)
+ if err != nil {
+ return nil, err
+ }
}
openlog.Info(
fmt.Sprintf("update %s with labels %s value [%s]",