You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ro...@apache.org on 2021/11/19 12:50:06 UTC
[servicecomb-kie] branch master updated: add config sync task
This is an automated email from the ASF dual-hosted git repository.
robotljw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-kie.git
The following commit(s) were added to refs/heads/master by this push:
new 4196c12 add config sync task
4196c12 is described below
commit 4196c12917d24b7630cea8a15e3048b1eeb56777
Author: robotljw <79...@qq.com>
AuthorDate: Fri Nov 19 20:46:44 2021 +0800
add config sync task
---
pkg/model/db_schema.go | 9 +++++++
server/datasource/dao.go | 20 ++++++++++-----
server/datasource/etcd/init.go | 6 +++++
server/datasource/etcd/key/key.go | 6 +++++
server/datasource/etcd/task/task_dao.go | 42 ++++++++++++++++++++++++++++++++
server/datasource/mongo/init.go | 6 +++++
server/datasource/mongo/task/task_dao.go | 17 +++++++++++++
server/service/kv/kv_svc.go | 12 +++++++++
8 files changed, 112 insertions(+), 6 deletions(-)
diff --git a/pkg/model/db_schema.go b/pkg/model/db_schema.go
index 0434dcd..acfe42b 100644
--- a/pkg/model/db_schema.go
+++ b/pkg/model/db_schema.go
@@ -48,6 +48,15 @@ type KVDoc struct {
Domain string `json:"domain,omitempty" yaml:"domain,omitempty" validate:"min=1,max=256,commonName"` //redundant
}
+// Task is db struct to store sync task
+type Task struct {
+ Action string `json:"action" bson:"action"`
+ DataType string `json:"data_type" bson:"data_type"`
+ Data interface{} `json:"data" bson:"data"`
+ Timestamp string `json:"timestamp" bson:"timestamp"`
+ Status string `json:"status" bson:"status"`
+}
+
//ViewDoc is db struct, it saves user's custom view name and criteria
type ViewDoc struct {
ID string `json:"id,omitempty" bson:"id,omitempty" yaml:"id,omitempty" swag:"string"`
diff --git a/server/datasource/dao.go b/server/datasource/dao.go
index 3ec3e74..196782f 100644
--- a/server/datasource/dao.go
+++ b/server/datasource/dao.go
@@ -35,12 +35,13 @@ var (
)
var (
- ErrKeyNotExists = errors.New("can not find any key value")
- ErrRecordNotExists = errors.New("can not find any polling data")
- ErrRevisionNotExist = errors.New("revision does not exist")
- ErrAliasNotGiven = errors.New("label alias not given")
- ErrKVAlreadyExists = errors.New("kv already exists")
- ErrTooMany = errors.New("key with labels should be only one")
+ ErrKeyNotExists = errors.New("can not find any key value")
+ ErrRecordNotExists = errors.New("can not find any polling data")
+ ErrRevisionNotExist = errors.New("revision does not exist")
+ ErrAliasNotGiven = errors.New("label alias not given")
+ ErrKVAlreadyExists = errors.New("kv already exists")
+ ErrTaskAlreadyExists = errors.New("sync task already exists")
+ ErrTooMany = errors.New("key with labels should be only one")
)
const (
@@ -61,6 +62,7 @@ type Broker interface {
GetHistoryDao() HistoryDao
GetTrackDao() TrackDao
GetKVDao() KVDao
+ GetTaskDao() TaskDao
}
func GetBroker() Broker {
@@ -84,6 +86,12 @@ type KVDao interface {
Total(ctx context.Context, project, domain string) (int64, error)
}
+// TaskDao provide api of Task entity
+type TaskDao interface {
+ Create(ctx context.Context, task *model.Task, domain string, project string) (*model.Task, error)
+ Update(ctx context.Context, task *model.Task, domain string, project string) error
+}
+
//HistoryDao provide api of History entity
type HistoryDao interface {
AddHistory(ctx context.Context, kv *model.KVDoc) error
diff --git a/server/datasource/etcd/init.go b/server/datasource/etcd/init.go
index fdd7596..eeb5e07 100644
--- a/server/datasource/etcd/init.go
+++ b/server/datasource/etcd/init.go
@@ -30,6 +30,7 @@ import (
"github.com/apache/servicecomb-kie/server/datasource/etcd/counter"
"github.com/apache/servicecomb-kie/server/datasource/etcd/history"
"github.com/apache/servicecomb-kie/server/datasource/etcd/kv"
+ "github.com/apache/servicecomb-kie/server/datasource/etcd/task"
"github.com/apache/servicecomb-kie/server/datasource/etcd/track"
"github.com/apache/servicecomb-kie/server/datasource/tlsutil"
"github.com/go-chassis/openlog"
@@ -69,6 +70,11 @@ func (*Broker) GetHistoryDao() datasource.HistoryDao {
func (*Broker) GetTrackDao() datasource.TrackDao {
return &track.Dao{}
}
+
+func (*Broker) GetTaskDao() datasource.TaskDao {
+ return &task.Dao{}
+}
+
func init() {
datasource.RegisterPlugin("etcd", NewFrom)
datasource.RegisterPlugin("embedded_etcd", NewFrom)
diff --git a/server/datasource/etcd/key/key.go b/server/datasource/etcd/key/key.go
index 1df4665..7cb1a33 100644
--- a/server/datasource/etcd/key/key.go
+++ b/server/datasource/etcd/key/key.go
@@ -28,8 +28,14 @@ const (
keyCounter = "counter"
keyHistory = "kv-history"
keyTrack = "track"
+ sync = "/sync"
+ task = "task"
)
+func TaskKey(domain, project, timestamp string) string {
+ return strings.Join([]string{sync, task, domain, project, timestamp}, split)
+}
+
func KV(domain, project, kvID string) string {
return strings.Join([]string{keyKV, domain, project, kvID}, split)
}
diff --git a/server/datasource/etcd/task/task_dao.go b/server/datasource/etcd/task/task_dao.go
new file mode 100644
index 0000000..3fdb398
--- /dev/null
+++ b/server/datasource/etcd/task/task_dao.go
@@ -0,0 +1,42 @@
+package task
+
+import (
+ "context"
+ "encoding/json"
+ "github.com/apache/servicecomb-kie/pkg/model"
+ "github.com/apache/servicecomb-kie/server/datasource"
+ "github.com/apache/servicecomb-kie/server/datasource/etcd/key"
+ "github.com/go-chassis/openlog"
+ "github.com/little-cui/etcdadpt"
+)
+
+type Dao struct {
+}
+
+func (d *Dao) Create(ctx context.Context, task *model.Task, domain string, project string) (*model.Task, error) {
+ bytes, err := json.Marshal(task)
+ if err != nil {
+ openlog.Error("marshal task ")
+ return nil, err
+ }
+ ok, err := etcdadpt.InsertBytes(ctx, key.TaskKey(domain, project, task.Timestamp), bytes)
+ if err != nil {
+ openlog.Error("create error", openlog.WithTags(openlog.Tags{
+ "err": err.Error(),
+ "task": task,
+ }))
+ return nil, err
+ }
+ if !ok {
+ openlog.Error("create error", openlog.WithTags(openlog.Tags{
+ "err": datasource.ErrTaskAlreadyExists.Error(),
+ "task": task,
+ }))
+ return nil, datasource.ErrTaskAlreadyExists
+ }
+ return task, nil
+}
+
+func (d *Dao) Update(ctx context.Context, task *model.Task, domain string, project string) error {
+ return nil
+}
diff --git a/server/datasource/mongo/init.go b/server/datasource/mongo/init.go
index 230bc8e..4266d14 100644
--- a/server/datasource/mongo/init.go
+++ b/server/datasource/mongo/init.go
@@ -23,6 +23,7 @@ import (
"github.com/apache/servicecomb-kie/server/datasource/mongo/history"
"github.com/apache/servicecomb-kie/server/datasource/mongo/kv"
"github.com/apache/servicecomb-kie/server/datasource/mongo/session"
+ "github.com/apache/servicecomb-kie/server/datasource/mongo/task"
"github.com/apache/servicecomb-kie/server/datasource/mongo/track"
"github.com/go-chassis/openlog"
)
@@ -46,6 +47,11 @@ func (*Broker) GetHistoryDao() datasource.HistoryDao {
func (*Broker) GetTrackDao() datasource.TrackDao {
return &track.Dao{}
}
+
+func (*Broker) GetTaskDao() datasource.TaskDao {
+ return &task.Dao{}
+}
+
func init() {
datasource.RegisterPlugin("mongo", NewFrom)
}
diff --git a/server/datasource/mongo/task/task_dao.go b/server/datasource/mongo/task/task_dao.go
new file mode 100644
index 0000000..ad32464
--- /dev/null
+++ b/server/datasource/mongo/task/task_dao.go
@@ -0,0 +1,17 @@
+package task
+
+import (
+ "context"
+ "github.com/apache/servicecomb-kie/pkg/model"
+)
+
+type Dao struct {
+}
+
+func (d *Dao) Create(ctx context.Context, task *model.Task, domain string, project string) (*model.Task, error) {
+ return nil, nil
+}
+
+func (d *Dao) Update(ctx context.Context, task *model.Task, domain string, project string) error {
+ return nil
+}
diff --git a/server/service/kv/kv_svc.go b/server/service/kv/kv_svc.go
index 3ed06ec..9b712e8 100644
--- a/server/service/kv/kv_svc.go
+++ b/server/service/kv/kv_svc.go
@@ -20,6 +20,7 @@ package kv
import (
"context"
"fmt"
+ "strconv"
"time"
"github.com/apache/servicecomb-kie/pkg/common"
@@ -116,6 +117,17 @@ func Create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, *errsvc.Error)
openlog.Error(fmt.Sprintf("post err:%s", err.Error()))
return nil, config.NewError(config.ErrInternal, "create kv failed")
}
+ task := &model.Task{
+ Action: "create",
+ DataType: "config",
+ Data: kv,
+ Timestamp: strconv.FormatInt(time.Now().Unix(), 10),
+ }
+ _, err = datasource.GetBroker().GetTaskDao().Create(ctx, task, kv.Domain, kv.Project)
+ if err != nil {
+ openlog.Error(fmt.Sprintf("fail to create task, err:%s ", err))
+ }
+
err = datasource.GetBroker().GetHistoryDao().AddHistory(ctx, kv)
if err != nil {
openlog.Warn(