You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ro...@apache.org on 2021/11/20 10:12:04 UTC

[servicecomb-kie] 01/01: add config sync task

This is an automated email from the ASF dual-hosted git repository.

robotljw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-kie.git

commit defe7306ff15b9de7256889286ef8daf1fae8d92
Author: robotljw <79...@qq.com>
AuthorDate: Fri Nov 19 20:46:44 2021 +0800

    add config sync task
---
 examples/dev/kie-conf.yaml                 |  2 +
 pkg/model/db_schema.go                     |  9 +++
 server/config/struct.go                    |  1 +
 server/datasource/dao.go                   | 23 ++++++--
 server/datasource/etcd/init.go             |  1 +
 server/datasource/etcd/key/key.go          | 10 ++++
 server/datasource/etcd/kv/kv_dao.go        | 88 ++++++++++++++++++++++++++++++
 server/datasource/mongo/init.go            |  1 +
 server/datasource/mongo/kv/kv_dao.go       | 58 ++++++++++++++++++++
 server/datasource/mongo/session/session.go |  1 +
 server/service/kv/kv_svc.go                | 44 ++++++++++++---
 11 files changed, 225 insertions(+), 13 deletions(-)

diff --git a/examples/dev/kie-conf.yaml b/examples/dev/kie-conf.yaml
index 7d10d4a..ffd38b1 100644
--- a/examples/dev/kie-conf.yaml
+++ b/examples/dev/kie-conf.yaml
@@ -7,6 +7,8 @@ db:
   #   kind=embedded_etcd, then is the embedded etcd server's advertise-peer-urls, e.g. default=http://127.0.0.1:2380
   #uri: mongodb://kie:123@127.0.0.1:27017/kie
   uri: http://127.0.0.1:2379
+  # turn on the synchronization switch related operations will be written to the task in the db
+  syncEnabled: false
 #  poolSize: 10
 #  timeout: 5m
 #  sslEnabled: false
diff --git a/pkg/model/db_schema.go b/pkg/model/db_schema.go
index 0434dcd..acfe42b 100644
--- a/pkg/model/db_schema.go
+++ b/pkg/model/db_schema.go
@@ -48,6 +48,15 @@ type KVDoc struct {
 	Domain string            `json:"domain,omitempty" yaml:"domain,omitempty" validate:"min=1,max=256,commonName"`              //redundant
 }
 
+// Task is db struct to store sync task
+type Task struct {
+	Action    string      `json:"action" bson:"action"`
+	DataType  string      `json:"data_type" bson:"data_type"`
+	Data      interface{} `json:"data" bson:"data"`
+	Timestamp string      `json:"timestamp" bson:"timestamp"`
+	Status    string      `json:"status" bson:"status"`
+}
+
 //ViewDoc is db struct, it saves user's custom view name and criteria
 type ViewDoc struct {
 	ID       string `json:"id,omitempty" bson:"id,omitempty" yaml:"id,omitempty" swag:"string"`
diff --git a/server/config/struct.go b/server/config/struct.go
index 357a84b..d19d2b1 100644
--- a/server/config/struct.go
+++ b/server/config/struct.go
@@ -41,6 +41,7 @@ type DB struct {
 	CertPwdFile string `yaml:"certPwdFile"`
 	Timeout     string `yaml:"timeout"`
 	VerifyPeer  bool   `yaml:"verifyPeer"`
+	SyncEnable  bool   `yaml:"syncEnabled"`
 }
 
 //RBAC is rbac config
diff --git a/server/datasource/dao.go b/server/datasource/dao.go
index 3ec3e74..6607131 100644
--- a/server/datasource/dao.go
+++ b/server/datasource/dao.go
@@ -35,12 +35,13 @@ var (
 )
 
 var (
-	ErrKeyNotExists     = errors.New("can not find any key value")
-	ErrRecordNotExists  = errors.New("can not find any polling data")
-	ErrRevisionNotExist = errors.New("revision does not exist")
-	ErrAliasNotGiven    = errors.New("label alias not given")
-	ErrKVAlreadyExists  = errors.New("kv already exists")
-	ErrTooMany          = errors.New("key with labels should be only one")
+	ErrKeyNotExists        = errors.New("can not find any key value")
+	ErrRecordNotExists     = errors.New("can not find any polling data")
+	ErrRevisionNotExist    = errors.New("revision does not exist")
+	ErrAliasNotGiven       = errors.New("label alias not given")
+	ErrKVAlreadyExists     = errors.New("kv already exists")
+	ErrKVTaskAlreadyExists = errors.New("kv or sync task already exists")
+	ErrTooMany             = errors.New("key with labels should be only one")
 )
 
 const (
@@ -73,6 +74,10 @@ type KVDao interface {
 	Create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, error)
 	Update(ctx context.Context, kv *model.KVDoc) error
 	List(ctx context.Context, project, domain string, options ...FindOption) (*model.KVResponse, error)
+
+	CreateWithTask(ctx context.Context, kv *model.KVDoc, task *model.Task) (*model.KVDoc, error)
+	UpdateWithTask(ctx context.Context, kv *model.KVDoc, task *model.Task) error
+
 	//FindOneAndDelete deletes one kv by id and return the deleted kv as these appeared before deletion
 	FindOneAndDelete(ctx context.Context, kvID string, project, domain string) (*model.KVDoc, error)
 	//FindManyAndDelete deletes multiple kvs and return the deleted kv list as these appeared before deletion
@@ -84,6 +89,12 @@ type KVDao interface {
 	Total(ctx context.Context, project, domain string) (int64, error)
 }
 
+// TaskDao provide api of Task entity
+type TaskDao interface {
+	Create(ctx context.Context, task *model.Task, domain string, project string) (*model.Task, error)
+	Update(ctx context.Context, task *model.Task, domain string, project string) error
+}
+
 //HistoryDao provide api of History entity
 type HistoryDao interface {
 	AddHistory(ctx context.Context, kv *model.KVDoc) error
diff --git a/server/datasource/etcd/init.go b/server/datasource/etcd/init.go
index fdd7596..622a314 100644
--- a/server/datasource/etcd/init.go
+++ b/server/datasource/etcd/init.go
@@ -69,6 +69,7 @@ func (*Broker) GetHistoryDao() datasource.HistoryDao {
 func (*Broker) GetTrackDao() datasource.TrackDao {
 	return &track.Dao{}
 }
+
 func init() {
 	datasource.RegisterPlugin("etcd", NewFrom)
 	datasource.RegisterPlugin("embedded_etcd", NewFrom)
diff --git a/server/datasource/etcd/key/key.go b/server/datasource/etcd/key/key.go
index 1df4665..bd7ac15 100644
--- a/server/datasource/etcd/key/key.go
+++ b/server/datasource/etcd/key/key.go
@@ -28,8 +28,18 @@ const (
 	keyCounter = "counter"
 	keyHistory = "kv-history"
 	keyTrack   = "track"
+	sync       = "sync"
+	task       = "task"
 )
 
+func getSyncRootKey() string {
+	return split + sync
+}
+
+func TaskKey(domain, project, timestamp string) string {
+	return strings.Join([]string{getSyncRootKey(), task, domain, project, timestamp}, split)
+}
+
 func KV(domain, project, kvID string) string {
 	return strings.Join([]string{keyKV, domain, project, kvID}, split)
 }
diff --git a/server/datasource/etcd/kv/kv_dao.go b/server/datasource/etcd/kv/kv_dao.go
index d3f02b0..a52a37f 100644
--- a/server/datasource/etcd/kv/kv_dao.go
+++ b/server/datasource/etcd/kv/kv_dao.go
@@ -20,6 +20,7 @@ package kv
 import (
 	"context"
 	"encoding/json"
+	"fmt"
 	"regexp"
 	"strings"
 
@@ -59,6 +60,93 @@ func (s *Dao) Create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, error)
 	return kv, nil
 }
 
+// CreateWithTask is used to create with the task after synchronization is turned on
+func (s *Dao) CreateWithTask(ctx context.Context, kv *model.KVDoc, task *model.Task) (*model.KVDoc, error) {
+	kvBytes, err := json.Marshal(kv)
+	if err != nil {
+		openlog.Error("fail to marshal kv")
+		return nil, err
+	}
+	taskBytes, err := json.Marshal(task)
+	if err != nil {
+		openlog.Error("fail to marshal task ")
+		return nil, err
+	}
+	kvOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.KV(kv.Domain, kv.Project, kv.ID)), etcdadpt.WithValue(kvBytes))
+	taskOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.TaskKey(kv.Domain, kv.Project, task.Timestamp)), etcdadpt.WithValue(taskBytes))
+	kvOpCmp := etcdadpt.OpCmp(etcdadpt.CmpCreateRev(kvOpPut.Key), etcdadpt.CmpEqual, 0)
+	taskOpCmp := etcdadpt.OpCmp(etcdadpt.CmpCreateRev(taskOpPut.Key), etcdadpt.CmpEqual, 0)
+	resp, err := etcdadpt.Instance().TxnWithCmp(ctx, []etcdadpt.OpOptions{kvOpPut, taskOpPut}, []etcdadpt.CmpOptions{kvOpCmp, taskOpCmp}, nil)
+	if err != nil {
+		openlog.Error("create error", openlog.WithTags(openlog.Tags{
+			"err":  err.Error(),
+			"kv":   kv,
+			"task": task,
+		}))
+		return nil, err
+	}
+	if !resp.Succeeded {
+		openlog.Error("create error", openlog.WithTags(openlog.Tags{
+			"err":  datasource.ErrKVTaskAlreadyExists.Error(),
+			"kv":   kv,
+			"task": task,
+		}))
+		return nil, datasource.ErrKVTaskAlreadyExists
+	}
+	return kv, nil
+}
+
+func (s *Dao) UpdateWithTask(ctx context.Context, kv *model.KVDoc, task *model.Task) error {
+	keyKv := key.KV(kv.Domain, kv.Project, kv.ID)
+	resp, err := etcdadpt.Get(ctx, keyKv)
+	if err != nil {
+		openlog.Error(err.Error())
+		return err
+	}
+	if resp == nil {
+		return datasource.ErrRecordNotExists
+	}
+
+	var old model.KVDoc
+	err = json.Unmarshal(resp.Value, &old)
+	if err != nil {
+		openlog.Error(err.Error())
+		return err
+	}
+	old.LabelFormat = kv.LabelFormat
+	old.Value = kv.Value
+	old.Status = kv.Status
+	old.Checker = kv.Checker
+	old.UpdateTime = kv.UpdateTime
+	old.UpdateRevision = kv.UpdateRevision
+
+	kvBytes, err := json.Marshal(old)
+	if err != nil {
+		openlog.Error(err.Error())
+		return err
+	}
+
+	taskBytes, err := json.Marshal(task)
+	if err != nil {
+		openlog.Error(err.Error())
+		return err
+	}
+
+	kvOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(keyKv), etcdadpt.WithValue(kvBytes))
+	taskOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.TaskKey(kv.Domain, kv.Project, task.Timestamp)), etcdadpt.WithValue(taskBytes))
+	kvOpCmp := etcdadpt.OpCmp(etcdadpt.CmpCreateRev(kvOpPut.Key), etcdadpt.CmpEqual, 0)
+	taskOpCmp := etcdadpt.OpCmp(etcdadpt.CmpCreateRev(taskOpPut.Key), etcdadpt.CmpEqual, 0)
+	response, err := etcdadpt.Instance().TxnWithCmp(ctx, []etcdadpt.OpOptions{kvOpPut, taskOpPut}, []etcdadpt.CmpOptions{kvOpCmp, taskOpCmp}, nil)
+	openlog.Info("44444444444444444444")
+	openlog.Info(fmt.Sprintf("%v",response))
+	//err = etcdadpt.PutBytes(ctx, keyKv, bytes)
+	if err != nil {
+		openlog.Error(err.Error())
+		return err
+	}
+	return nil
+}
+
 //Update update key value
 func (s *Dao) Update(ctx context.Context, kv *model.KVDoc) error {
 	keyKv := key.KV(kv.Domain, kv.Project, kv.ID)
diff --git a/server/datasource/mongo/init.go b/server/datasource/mongo/init.go
index 230bc8e..2b4ae33 100644
--- a/server/datasource/mongo/init.go
+++ b/server/datasource/mongo/init.go
@@ -46,6 +46,7 @@ func (*Broker) GetHistoryDao() datasource.HistoryDao {
 func (*Broker) GetTrackDao() datasource.TrackDao {
 	return &track.Dao{}
 }
+
 func init() {
 	datasource.RegisterPlugin("mongo", NewFrom)
 }
diff --git a/server/datasource/mongo/kv/kv_dao.go b/server/datasource/mongo/kv/kv_dao.go
index 22d832e..9a13813 100644
--- a/server/datasource/mongo/kv/kv_dao.go
+++ b/server/datasource/mongo/kv/kv_dao.go
@@ -57,6 +57,64 @@ func (s *Dao) Create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, error)
 	return kv, nil
 }
 
+func (s *Dao) CreateWithTask(ctx context.Context, kv *model.KVDoc, task *model.Task) (*model.KVDoc, error) {
+	taskSession, err := session.GetDB().Client().StartSession()
+	if err != nil {
+		return nil, err
+	}
+	if err = taskSession.StartTransaction(); err != nil {
+		return nil, err
+	}
+	defer taskSession.EndSession(ctx)
+	if err = mongo.WithSession(ctx, taskSession, func(sessionContext mongo.SessionContext) error {
+		collection := session.GetDB().Collection(session.CollectionKV)
+		_, err = collection.InsertOne(sessionContext, kv)
+		if err != nil {
+			openlog.Error("create error", openlog.WithTags(openlog.Tags{
+				"err": err.Error(),
+				"kv":  kv,
+			}))
+			errAbort := taskSession.AbortTransaction(sessionContext)
+			if errAbort != nil {
+				openlog.Error("fail to abort transaction", openlog.WithTags(openlog.Tags{
+					"err": errAbort.Error(),
+					"kv":  kv,
+				}))
+			}
+			return err
+		}
+
+		collection = session.GetDB().Collection(session.CollectionTask)
+		_, err = collection.InsertOne(sessionContext, task)
+		if err != nil {
+			openlog.Error("create error", openlog.WithTags(openlog.Tags{
+				"err":  err.Error(),
+				"task": task,
+			}))
+			errAbort := taskSession.AbortTransaction(sessionContext)
+			if errAbort != nil {
+				openlog.Error("fail to abort transaction", openlog.WithTags(openlog.Tags{
+					"err":  errAbort.Error(),
+					"task": task,
+				}))
+			}
+			return err
+		}
+		if err = taskSession.CommitTransaction(sessionContext); err != nil {
+			return err
+		}
+		return nil
+	}); err != nil {
+		openlog.Error(err.Error())
+		return nil, err
+	}
+	return kv, nil
+}
+
+func (s *Dao) UpdateWithTask(ctx context.Context, kv *model.KVDoc, task *model.Task) error {
+	return nil
+}
+
 //Update update key value
 func (s *Dao) Update(ctx context.Context, kv *model.KVDoc) error {
 	collection := session.GetDB().Collection(session.CollectionKV)
diff --git a/server/datasource/mongo/session/session.go b/server/datasource/mongo/session/session.go
index 94a4ce9..75c5bd3 100644
--- a/server/datasource/mongo/session/session.go
+++ b/server/datasource/mongo/session/session.go
@@ -49,6 +49,7 @@ const (
 	CollectionPollingDetail = "polling_detail"
 	CollectionCounter       = "counter"
 	CollectionView          = "view"
+	CollectionTask          = "task"
 )
 
 //db errors
diff --git a/server/service/kv/kv_svc.go b/server/service/kv/kv_svc.go
index 3ed06ec..3a573d2 100644
--- a/server/service/kv/kv_svc.go
+++ b/server/service/kv/kv_svc.go
@@ -20,12 +20,14 @@ package kv
 import (
 	"context"
 	"fmt"
+	"strconv"
 	"time"
 
 	"github.com/apache/servicecomb-kie/pkg/common"
 	"github.com/apache/servicecomb-kie/pkg/concurrency"
 	"github.com/apache/servicecomb-kie/pkg/model"
 	"github.com/apache/servicecomb-kie/pkg/stringutil"
+	cfg "github.com/apache/servicecomb-kie/server/config"
 	"github.com/apache/servicecomb-kie/server/datasource"
 	"github.com/apache/servicecomb-kie/server/pubsub"
 	"github.com/go-chassis/cari/config"
@@ -111,10 +113,25 @@ func Create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, *errsvc.Error)
 		openlog.Error(err.Error())
 		return nil, config.NewError(config.ErrInternal, "create kv failed")
 	}
-	kv, err = datasource.GetBroker().GetKVDao().Create(ctx, kv)
-	if err != nil {
-		openlog.Error(fmt.Sprintf("post err:%s", err.Error()))
-		return nil, config.NewError(config.ErrInternal, "create kv failed")
+	// open synchronization needs to write tasks to db
+	if cfg.GetDB().SyncEnable {
+		task := &model.Task{
+			Action:    "create",
+			DataType:  "config",
+			Data:      kv,
+			Timestamp: strconv.FormatInt(time.Now().Unix(), 10),
+		}
+		kv, err = datasource.GetBroker().GetKVDao().CreateWithTask(ctx, kv, task)
+		if err != nil {
+			openlog.Error(fmt.Sprintf("post err:%s", err.Error()))
+			return nil, config.NewError(config.ErrInternal, "create kv failed")
+		}
+	} else {
+		kv, err = datasource.GetBroker().GetKVDao().Create(ctx, kv)
+		if err != nil {
+			openlog.Error(fmt.Sprintf("post err:%s", err.Error()))
+			return nil, config.NewError(config.ErrInternal, "create kv failed")
+		}
 	}
 	err = datasource.GetBroker().GetHistoryDao().AddHistory(ctx, kv)
 	if err != nil {
@@ -228,9 +245,22 @@ func Update(ctx context.Context, kv *model.UpdateKVRequest) (*model.KVDoc, error
 	if err != nil {
 		return nil, err
 	}
-	err = datasource.GetBroker().GetKVDao().Update(ctx, oldKV)
-	if err != nil {
-		return nil, err
+	if cfg.GetDB().SyncEnable {
+		task := &model.Task{
+			Action:    "update",
+			DataType:  "config",
+			Data:      kv,
+			Timestamp: strconv.FormatInt(time.Now().Unix(), 10),
+		}
+		err = datasource.GetBroker().GetKVDao().UpdateWithTask(ctx, oldKV, task)
+		if err != nil {
+			return nil, err
+		}
+	} else {
+		err = datasource.GetBroker().GetKVDao().Update(ctx, oldKV)
+		if err != nil {
+			return nil, err
+		}
 	}
 	openlog.Info(
 		fmt.Sprintf("update %s with labels %s value [%s]",