You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ro...@apache.org on 2021/11/19 12:50:06 UTC

[servicecomb-kie] branch master updated: add config sync task

This is an automated email from the ASF dual-hosted git repository.

robotljw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-kie.git


The following commit(s) were added to refs/heads/master by this push:
     new 4196c12  add config sync task
4196c12 is described below

commit 4196c12917d24b7630cea8a15e3048b1eeb56777
Author: robotljw <79...@qq.com>
AuthorDate: Fri Nov 19 20:46:44 2021 +0800

    add config sync task
---
 pkg/model/db_schema.go                   |  9 +++++++
 server/datasource/dao.go                 | 20 ++++++++++-----
 server/datasource/etcd/init.go           |  6 +++++
 server/datasource/etcd/key/key.go        |  6 +++++
 server/datasource/etcd/task/task_dao.go  | 42 ++++++++++++++++++++++++++++++++
 server/datasource/mongo/init.go          |  6 +++++
 server/datasource/mongo/task/task_dao.go | 17 +++++++++++++
 server/service/kv/kv_svc.go              | 12 +++++++++
 8 files changed, 112 insertions(+), 6 deletions(-)

diff --git a/pkg/model/db_schema.go b/pkg/model/db_schema.go
index 0434dcd..acfe42b 100644
--- a/pkg/model/db_schema.go
+++ b/pkg/model/db_schema.go
@@ -48,6 +48,15 @@ type KVDoc struct {
 	Domain string            `json:"domain,omitempty" yaml:"domain,omitempty" validate:"min=1,max=256,commonName"`              //redundant
 }
 
+// Task is db struct to store sync task
+type Task struct {
+	Action    string      `json:"action" bson:"action"`
+	DataType  string      `json:"data_type" bson:"data_type"`
+	Data      interface{} `json:"data" bson:"data"`
+	Timestamp string      `json:"timestamp" bson:"timestamp"`
+	Status    string      `json:"status" bson:"status"`
+}
+
 //ViewDoc is db struct, it saves user's custom view name and criteria
 type ViewDoc struct {
 	ID       string `json:"id,omitempty" bson:"id,omitempty" yaml:"id,omitempty" swag:"string"`
diff --git a/server/datasource/dao.go b/server/datasource/dao.go
index 3ec3e74..196782f 100644
--- a/server/datasource/dao.go
+++ b/server/datasource/dao.go
@@ -35,12 +35,13 @@ var (
 )
 
 var (
-	ErrKeyNotExists     = errors.New("can not find any key value")
-	ErrRecordNotExists  = errors.New("can not find any polling data")
-	ErrRevisionNotExist = errors.New("revision does not exist")
-	ErrAliasNotGiven    = errors.New("label alias not given")
-	ErrKVAlreadyExists  = errors.New("kv already exists")
-	ErrTooMany          = errors.New("key with labels should be only one")
+	ErrKeyNotExists      = errors.New("can not find any key value")
+	ErrRecordNotExists   = errors.New("can not find any polling data")
+	ErrRevisionNotExist  = errors.New("revision does not exist")
+	ErrAliasNotGiven     = errors.New("label alias not given")
+	ErrKVAlreadyExists   = errors.New("kv already exists")
+	ErrTaskAlreadyExists = errors.New("sync task already exists")
+	ErrTooMany           = errors.New("key with labels should be only one")
 )
 
 const (
@@ -61,6 +62,7 @@ type Broker interface {
 	GetHistoryDao() HistoryDao
 	GetTrackDao() TrackDao
 	GetKVDao() KVDao
+	GetTaskDao() TaskDao
 }
 
 func GetBroker() Broker {
@@ -84,6 +86,12 @@ type KVDao interface {
 	Total(ctx context.Context, project, domain string) (int64, error)
 }
 
+// TaskDao provide api of Task entity
+type TaskDao interface {
+	Create(ctx context.Context, task *model.Task, domain string, project string) (*model.Task, error)
+	Update(ctx context.Context, task *model.Task, domain string, project string) error
+}
+
 //HistoryDao provide api of History entity
 type HistoryDao interface {
 	AddHistory(ctx context.Context, kv *model.KVDoc) error
diff --git a/server/datasource/etcd/init.go b/server/datasource/etcd/init.go
index fdd7596..eeb5e07 100644
--- a/server/datasource/etcd/init.go
+++ b/server/datasource/etcd/init.go
@@ -30,6 +30,7 @@ import (
 	"github.com/apache/servicecomb-kie/server/datasource/etcd/counter"
 	"github.com/apache/servicecomb-kie/server/datasource/etcd/history"
 	"github.com/apache/servicecomb-kie/server/datasource/etcd/kv"
+	"github.com/apache/servicecomb-kie/server/datasource/etcd/task"
 	"github.com/apache/servicecomb-kie/server/datasource/etcd/track"
 	"github.com/apache/servicecomb-kie/server/datasource/tlsutil"
 	"github.com/go-chassis/openlog"
@@ -69,6 +70,11 @@ func (*Broker) GetHistoryDao() datasource.HistoryDao {
 func (*Broker) GetTrackDao() datasource.TrackDao {
 	return &track.Dao{}
 }
+
+func (*Broker) GetTaskDao() datasource.TaskDao {
+	return &task.Dao{}
+}
+
 func init() {
 	datasource.RegisterPlugin("etcd", NewFrom)
 	datasource.RegisterPlugin("embedded_etcd", NewFrom)
diff --git a/server/datasource/etcd/key/key.go b/server/datasource/etcd/key/key.go
index 1df4665..7cb1a33 100644
--- a/server/datasource/etcd/key/key.go
+++ b/server/datasource/etcd/key/key.go
@@ -28,8 +28,14 @@ const (
 	keyCounter = "counter"
 	keyHistory = "kv-history"
 	keyTrack   = "track"
+	sync       = "/sync"
+	task       = "task"
 )
 
+func TaskKey(domain, project, timestamp string) string {
+	return strings.Join([]string{sync, task, domain, project, timestamp}, split)
+}
+
 func KV(domain, project, kvID string) string {
 	return strings.Join([]string{keyKV, domain, project, kvID}, split)
 }
diff --git a/server/datasource/etcd/task/task_dao.go b/server/datasource/etcd/task/task_dao.go
new file mode 100644
index 0000000..3fdb398
--- /dev/null
+++ b/server/datasource/etcd/task/task_dao.go
@@ -0,0 +1,42 @@
+package task
+
+import (
+	"context"
+	"encoding/json"
+	"github.com/apache/servicecomb-kie/pkg/model"
+	"github.com/apache/servicecomb-kie/server/datasource"
+	"github.com/apache/servicecomb-kie/server/datasource/etcd/key"
+	"github.com/go-chassis/openlog"
+	"github.com/little-cui/etcdadpt"
+)
+
+type Dao struct {
+}
+
+func (d *Dao) Create(ctx context.Context, task *model.Task, domain string, project string) (*model.Task, error) {
+	bytes, err := json.Marshal(task)
+	if err != nil {
+		openlog.Error("marshal task ")
+		return nil, err
+	}
+	ok, err := etcdadpt.InsertBytes(ctx, key.TaskKey(domain, project, task.Timestamp), bytes)
+	if err != nil {
+		openlog.Error("create error", openlog.WithTags(openlog.Tags{
+			"err":  err.Error(),
+			"task": task,
+		}))
+		return nil, err
+	}
+	if !ok {
+		openlog.Error("create error", openlog.WithTags(openlog.Tags{
+			"err":  datasource.ErrTaskAlreadyExists.Error(),
+			"task": task,
+		}))
+		return nil, datasource.ErrTaskAlreadyExists
+	}
+	return task, nil
+}
+
+func (d *Dao) Update(ctx context.Context, task *model.Task, domain string, project string) error {
+	return nil
+}
diff --git a/server/datasource/mongo/init.go b/server/datasource/mongo/init.go
index 230bc8e..4266d14 100644
--- a/server/datasource/mongo/init.go
+++ b/server/datasource/mongo/init.go
@@ -23,6 +23,7 @@ import (
 	"github.com/apache/servicecomb-kie/server/datasource/mongo/history"
 	"github.com/apache/servicecomb-kie/server/datasource/mongo/kv"
 	"github.com/apache/servicecomb-kie/server/datasource/mongo/session"
+	"github.com/apache/servicecomb-kie/server/datasource/mongo/task"
 	"github.com/apache/servicecomb-kie/server/datasource/mongo/track"
 	"github.com/go-chassis/openlog"
 )
@@ -46,6 +47,11 @@ func (*Broker) GetHistoryDao() datasource.HistoryDao {
 func (*Broker) GetTrackDao() datasource.TrackDao {
 	return &track.Dao{}
 }
+
+func (*Broker) GetTaskDao() datasource.TaskDao {
+	return &task.Dao{}
+}
+
 func init() {
 	datasource.RegisterPlugin("mongo", NewFrom)
 }
diff --git a/server/datasource/mongo/task/task_dao.go b/server/datasource/mongo/task/task_dao.go
new file mode 100644
index 0000000..ad32464
--- /dev/null
+++ b/server/datasource/mongo/task/task_dao.go
@@ -0,0 +1,17 @@
+package task
+
+import (
+	"context"
+	"github.com/apache/servicecomb-kie/pkg/model"
+)
+
+type Dao struct {
+}
+
+func (d *Dao) Create(ctx context.Context, task *model.Task, domain string, project string) (*model.Task, error) {
+	return nil, nil
+}
+
+func (d *Dao) Update(ctx context.Context, task *model.Task, domain string, project string) error {
+	return nil
+}
diff --git a/server/service/kv/kv_svc.go b/server/service/kv/kv_svc.go
index 3ed06ec..9b712e8 100644
--- a/server/service/kv/kv_svc.go
+++ b/server/service/kv/kv_svc.go
@@ -20,6 +20,7 @@ package kv
 import (
 	"context"
 	"fmt"
+	"strconv"
 	"time"
 
 	"github.com/apache/servicecomb-kie/pkg/common"
@@ -116,6 +117,17 @@ func Create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, *errsvc.Error)
 		openlog.Error(fmt.Sprintf("post err:%s", err.Error()))
 		return nil, config.NewError(config.ErrInternal, "create kv failed")
 	}
+	task := &model.Task{
+		Action:    "create",
+		DataType:  "config",
+		Data:      kv,
+		Timestamp: strconv.FormatInt(time.Now().Unix(), 10),
+	}
+	_, err = datasource.GetBroker().GetTaskDao().Create(ctx, task, kv.Domain, kv.Project)
+	if err != nil {
+		openlog.Error(fmt.Sprintf("fail to create task, err:%s ", err))
+	}
+
 	err = datasource.GetBroker().GetHistoryDao().AddHistory(ctx, kv)
 	if err != nil {
 		openlog.Warn(