You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ro...@apache.org on 2021/11/20 10:12:03 UTC

[servicecomb-kie] branch master updated (4196c12 -> defe730)

This is an automated email from the ASF dual-hosted git repository.

robotljw pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-kie.git.


 discard 4196c12  add config sync task
     new defe730  add config sync task

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (4196c12)
            \
             N -- N -- N   refs/heads/master (defe730)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 examples/dev/kie-conf.yaml                 |  2 +
 server/config/struct.go                    |  1 +
 server/datasource/dao.go                   | 19 ++++---
 server/datasource/etcd/init.go             |  5 --
 server/datasource/etcd/key/key.go          |  8 ++-
 server/datasource/etcd/kv/kv_dao.go        | 88 ++++++++++++++++++++++++++++++
 server/datasource/etcd/task/task_dao.go    | 42 --------------
 server/datasource/mongo/init.go            |  5 --
 server/datasource/mongo/kv/kv_dao.go       | 58 ++++++++++++++++++++
 server/datasource/mongo/session/session.go |  1 +
 server/datasource/mongo/task/task_dao.go   | 17 ------
 server/service/kv/kv_svc.go                | 54 ++++++++++++------
 12 files changed, 203 insertions(+), 97 deletions(-)
 delete mode 100644 server/datasource/etcd/task/task_dao.go
 delete mode 100644 server/datasource/mongo/task/task_dao.go

[servicecomb-kie] 01/01: add config sync task

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

robotljw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-kie.git

commit defe7306ff15b9de7256889286ef8daf1fae8d92
Author: robotljw <79...@qq.com>
AuthorDate: Fri Nov 19 20:46:44 2021 +0800

    add config sync task
---
 examples/dev/kie-conf.yaml                 |  2 +
 pkg/model/db_schema.go                     |  9 +++
 server/config/struct.go                    |  1 +
 server/datasource/dao.go                   | 23 ++++++--
 server/datasource/etcd/init.go             |  1 +
 server/datasource/etcd/key/key.go          | 10 ++++
 server/datasource/etcd/kv/kv_dao.go        | 88 ++++++++++++++++++++++++++++++
 server/datasource/mongo/init.go            |  1 +
 server/datasource/mongo/kv/kv_dao.go       | 58 ++++++++++++++++++++
 server/datasource/mongo/session/session.go |  1 +
 server/service/kv/kv_svc.go                | 44 ++++++++++++---
 11 files changed, 225 insertions(+), 13 deletions(-)

diff --git a/examples/dev/kie-conf.yaml b/examples/dev/kie-conf.yaml
index 7d10d4a..ffd38b1 100644
--- a/examples/dev/kie-conf.yaml
+++ b/examples/dev/kie-conf.yaml
@@ -7,6 +7,8 @@ db:
   #   kind=embedded_etcd, then is the embedded etcd server's advertise-peer-urls, e.g. default=http://127.0.0.1:2380
   #uri: mongodb://kie:123@127.0.0.1:27017/kie
   uri: http://127.0.0.1:2379
+  # turn on the synchronization switch related operations will be written to the task in the db
+  syncEnabled: false
 #  poolSize: 10
 #  timeout: 5m
 #  sslEnabled: false
diff --git a/pkg/model/db_schema.go b/pkg/model/db_schema.go
index 0434dcd..acfe42b 100644
--- a/pkg/model/db_schema.go
+++ b/pkg/model/db_schema.go
@@ -48,6 +48,15 @@ type KVDoc struct {
 	Domain string            `json:"domain,omitempty" yaml:"domain,omitempty" validate:"min=1,max=256,commonName"`              //redundant
 }
 
+// Task is db struct to store sync task
+type Task struct {
+	Action    string      `json:"action" bson:"action"`
+	DataType  string      `json:"data_type" bson:"data_type"`
+	Data      interface{} `json:"data" bson:"data"`
+	Timestamp string      `json:"timestamp" bson:"timestamp"`
+	Status    string      `json:"status" bson:"status"`
+}
+
 //ViewDoc is db struct, it saves user's custom view name and criteria
 type ViewDoc struct {
 	ID       string `json:"id,omitempty" bson:"id,omitempty" yaml:"id,omitempty" swag:"string"`
diff --git a/server/config/struct.go b/server/config/struct.go
index 357a84b..d19d2b1 100644
--- a/server/config/struct.go
+++ b/server/config/struct.go
@@ -41,6 +41,7 @@ type DB struct {
 	CertPwdFile string `yaml:"certPwdFile"`
 	Timeout     string `yaml:"timeout"`
 	VerifyPeer  bool   `yaml:"verifyPeer"`
+	SyncEnable  bool   `yaml:"syncEnabled"`
 }
 
 //RBAC is rbac config
diff --git a/server/datasource/dao.go b/server/datasource/dao.go
index 3ec3e74..6607131 100644
--- a/server/datasource/dao.go
+++ b/server/datasource/dao.go
@@ -35,12 +35,13 @@ var (
 )
 
 var (
-	ErrKeyNotExists     = errors.New("can not find any key value")
-	ErrRecordNotExists  = errors.New("can not find any polling data")
-	ErrRevisionNotExist = errors.New("revision does not exist")
-	ErrAliasNotGiven    = errors.New("label alias not given")
-	ErrKVAlreadyExists  = errors.New("kv already exists")
-	ErrTooMany          = errors.New("key with labels should be only one")
+	ErrKeyNotExists        = errors.New("can not find any key value")
+	ErrRecordNotExists     = errors.New("can not find any polling data")
+	ErrRevisionNotExist    = errors.New("revision does not exist")
+	ErrAliasNotGiven       = errors.New("label alias not given")
+	ErrKVAlreadyExists     = errors.New("kv already exists")
+	ErrKVTaskAlreadyExists = errors.New("kv or sync task already exists")
+	ErrTooMany             = errors.New("key with labels should be only one")
 )
 
 const (
@@ -73,6 +74,10 @@ type KVDao interface {
 	Create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, error)
 	Update(ctx context.Context, kv *model.KVDoc) error
 	List(ctx context.Context, project, domain string, options ...FindOption) (*model.KVResponse, error)
+
+	CreateWithTask(ctx context.Context, kv *model.KVDoc, task *model.Task) (*model.KVDoc, error)
+	UpdateWithTask(ctx context.Context, kv *model.KVDoc, task *model.Task) error
+
 	//FindOneAndDelete deletes one kv by id and return the deleted kv as these appeared before deletion
 	FindOneAndDelete(ctx context.Context, kvID string, project, domain string) (*model.KVDoc, error)
 	//FindManyAndDelete deletes multiple kvs and return the deleted kv list as these appeared before deletion
@@ -84,6 +89,12 @@ type KVDao interface {
 	Total(ctx context.Context, project, domain string) (int64, error)
 }
 
+// TaskDao provide api of Task entity
+type TaskDao interface {
+	Create(ctx context.Context, task *model.Task, domain string, project string) (*model.Task, error)
+	Update(ctx context.Context, task *model.Task, domain string, project string) error
+}
+
 //HistoryDao provide api of History entity
 type HistoryDao interface {
 	AddHistory(ctx context.Context, kv *model.KVDoc) error
diff --git a/server/datasource/etcd/init.go b/server/datasource/etcd/init.go
index fdd7596..622a314 100644
--- a/server/datasource/etcd/init.go
+++ b/server/datasource/etcd/init.go
@@ -69,6 +69,7 @@ func (*Broker) GetHistoryDao() datasource.HistoryDao {
 func (*Broker) GetTrackDao() datasource.TrackDao {
 	return &track.Dao{}
 }
+
 func init() {
 	datasource.RegisterPlugin("etcd", NewFrom)
 	datasource.RegisterPlugin("embedded_etcd", NewFrom)
diff --git a/server/datasource/etcd/key/key.go b/server/datasource/etcd/key/key.go
index 1df4665..bd7ac15 100644
--- a/server/datasource/etcd/key/key.go
+++ b/server/datasource/etcd/key/key.go
@@ -28,8 +28,18 @@ const (
 	keyCounter = "counter"
 	keyHistory = "kv-history"
 	keyTrack   = "track"
+	sync       = "sync"
+	task       = "task"
 )
 
+func getSyncRootKey() string {
+	return split + sync
+}
+
+func TaskKey(domain, project, timestamp string) string {
+	return strings.Join([]string{getSyncRootKey(), task, domain, project, timestamp}, split)
+}
+
 func KV(domain, project, kvID string) string {
 	return strings.Join([]string{keyKV, domain, project, kvID}, split)
 }
diff --git a/server/datasource/etcd/kv/kv_dao.go b/server/datasource/etcd/kv/kv_dao.go
index d3f02b0..a52a37f 100644
--- a/server/datasource/etcd/kv/kv_dao.go
+++ b/server/datasource/etcd/kv/kv_dao.go
@@ -20,6 +20,7 @@ package kv
 import (
 	"context"
 	"encoding/json"
+	"fmt"
 	"regexp"
 	"strings"
 
@@ -59,6 +60,93 @@ func (s *Dao) Create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, error)
 	return kv, nil
 }
 
+// CreateWithTask is used to create with the task after synchronization is turned on
+func (s *Dao) CreateWithTask(ctx context.Context, kv *model.KVDoc, task *model.Task) (*model.KVDoc, error) {
+	kvBytes, err := json.Marshal(kv)
+	if err != nil {
+		openlog.Error("fail to marshal kv")
+		return nil, err
+	}
+	taskBytes, err := json.Marshal(task)
+	if err != nil {
+		openlog.Error("fail to marshal task ")
+		return nil, err
+	}
+	kvOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.KV(kv.Domain, kv.Project, kv.ID)), etcdadpt.WithValue(kvBytes))
+	taskOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.TaskKey(kv.Domain, kv.Project, task.Timestamp)), etcdadpt.WithValue(taskBytes))
+	kvOpCmp := etcdadpt.OpCmp(etcdadpt.CmpCreateRev(kvOpPut.Key), etcdadpt.CmpEqual, 0)
+	taskOpCmp := etcdadpt.OpCmp(etcdadpt.CmpCreateRev(taskOpPut.Key), etcdadpt.CmpEqual, 0)
+	resp, err := etcdadpt.Instance().TxnWithCmp(ctx, []etcdadpt.OpOptions{kvOpPut, taskOpPut}, []etcdadpt.CmpOptions{kvOpCmp, taskOpCmp}, nil)
+	if err != nil {
+		openlog.Error("create error", openlog.WithTags(openlog.Tags{
+			"err":  err.Error(),
+			"kv":   kv,
+			"task": task,
+		}))
+		return nil, err
+	}
+	if !resp.Succeeded {
+		openlog.Error("create error", openlog.WithTags(openlog.Tags{
+			"err":  datasource.ErrKVTaskAlreadyExists.Error(),
+			"kv":   kv,
+			"task": task,
+		}))
+		return nil, datasource.ErrKVTaskAlreadyExists
+	}
+	return kv, nil
+}
+
+func (s *Dao) UpdateWithTask(ctx context.Context, kv *model.KVDoc, task *model.Task) error {
+	keyKv := key.KV(kv.Domain, kv.Project, kv.ID)
+	resp, err := etcdadpt.Get(ctx, keyKv)
+	if err != nil {
+		openlog.Error(err.Error())
+		return err
+	}
+	if resp == nil {
+		return datasource.ErrRecordNotExists
+	}
+
+	var old model.KVDoc
+	err = json.Unmarshal(resp.Value, &old)
+	if err != nil {
+		openlog.Error(err.Error())
+		return err
+	}
+	old.LabelFormat = kv.LabelFormat
+	old.Value = kv.Value
+	old.Status = kv.Status
+	old.Checker = kv.Checker
+	old.UpdateTime = kv.UpdateTime
+	old.UpdateRevision = kv.UpdateRevision
+
+	kvBytes, err := json.Marshal(old)
+	if err != nil {
+		openlog.Error(err.Error())
+		return err
+	}
+
+	taskBytes, err := json.Marshal(task)
+	if err != nil {
+		openlog.Error(err.Error())
+		return err
+	}
+
+	kvOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(keyKv), etcdadpt.WithValue(kvBytes))
+	taskOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.TaskKey(kv.Domain, kv.Project, task.Timestamp)), etcdadpt.WithValue(taskBytes))
+	kvOpCmp := etcdadpt.OpCmp(etcdadpt.CmpCreateRev(kvOpPut.Key), etcdadpt.CmpEqual, 0)
+	taskOpCmp := etcdadpt.OpCmp(etcdadpt.CmpCreateRev(taskOpPut.Key), etcdadpt.CmpEqual, 0)
+	response, err := etcdadpt.Instance().TxnWithCmp(ctx, []etcdadpt.OpOptions{kvOpPut, taskOpPut}, []etcdadpt.CmpOptions{kvOpCmp, taskOpCmp}, nil)
+	openlog.Info("44444444444444444444")
+	openlog.Info(fmt.Sprintf("%v",response))
+	//err = etcdadpt.PutBytes(ctx, keyKv, bytes)
+	if err != nil {
+		openlog.Error(err.Error())
+		return err
+	}
+	return nil
+}
+
 //Update update key value
 func (s *Dao) Update(ctx context.Context, kv *model.KVDoc) error {
 	keyKv := key.KV(kv.Domain, kv.Project, kv.ID)
diff --git a/server/datasource/mongo/init.go b/server/datasource/mongo/init.go
index 230bc8e..2b4ae33 100644
--- a/server/datasource/mongo/init.go
+++ b/server/datasource/mongo/init.go
@@ -46,6 +46,7 @@ func (*Broker) GetHistoryDao() datasource.HistoryDao {
 func (*Broker) GetTrackDao() datasource.TrackDao {
 	return &track.Dao{}
 }
+
 func init() {
 	datasource.RegisterPlugin("mongo", NewFrom)
 }
diff --git a/server/datasource/mongo/kv/kv_dao.go b/server/datasource/mongo/kv/kv_dao.go
index 22d832e..9a13813 100644
--- a/server/datasource/mongo/kv/kv_dao.go
+++ b/server/datasource/mongo/kv/kv_dao.go
@@ -57,6 +57,64 @@ func (s *Dao) Create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, error)
 	return kv, nil
 }
 
+func (s *Dao) CreateWithTask(ctx context.Context, kv *model.KVDoc, task *model.Task) (*model.KVDoc, error) {
+	taskSession, err := session.GetDB().Client().StartSession()
+	if err != nil {
+		return nil, err
+	}
+	if err = taskSession.StartTransaction(); err != nil {
+		return nil, err
+	}
+	defer taskSession.EndSession(ctx)
+	if err = mongo.WithSession(ctx, taskSession, func(sessionContext mongo.SessionContext) error {
+		collection := session.GetDB().Collection(session.CollectionKV)
+		_, err = collection.InsertOne(sessionContext, kv)
+		if err != nil {
+			openlog.Error("create error", openlog.WithTags(openlog.Tags{
+				"err": err.Error(),
+				"kv":  kv,
+			}))
+			errAbort := taskSession.AbortTransaction(sessionContext)
+			if errAbort != nil {
+				openlog.Error("fail to abort transaction", openlog.WithTags(openlog.Tags{
+					"err": errAbort.Error(),
+					"kv":  kv,
+				}))
+			}
+			return err
+		}
+
+		collection = session.GetDB().Collection(session.CollectionTask)
+		_, err = collection.InsertOne(sessionContext, task)
+		if err != nil {
+			openlog.Error("create error", openlog.WithTags(openlog.Tags{
+				"err":  err.Error(),
+				"task": task,
+			}))
+			errAbort := taskSession.AbortTransaction(sessionContext)
+			if errAbort != nil {
+				openlog.Error("fail to abort transaction", openlog.WithTags(openlog.Tags{
+					"err":  errAbort.Error(),
+					"task": task,
+				}))
+			}
+			return err
+		}
+		if err = taskSession.CommitTransaction(sessionContext); err != nil {
+			return err
+		}
+		return nil
+	}); err != nil {
+		openlog.Error(err.Error())
+		return nil, err
+	}
+	return kv, nil
+}
+
+func (s *Dao) UpdateWithTask(ctx context.Context, kv *model.KVDoc, task *model.Task) error {
+	return nil
+}
+
 //Update update key value
 func (s *Dao) Update(ctx context.Context, kv *model.KVDoc) error {
 	collection := session.GetDB().Collection(session.CollectionKV)
diff --git a/server/datasource/mongo/session/session.go b/server/datasource/mongo/session/session.go
index 94a4ce9..75c5bd3 100644
--- a/server/datasource/mongo/session/session.go
+++ b/server/datasource/mongo/session/session.go
@@ -49,6 +49,7 @@ const (
 	CollectionPollingDetail = "polling_detail"
 	CollectionCounter       = "counter"
 	CollectionView          = "view"
+	CollectionTask          = "task"
 )
 
 //db errors
diff --git a/server/service/kv/kv_svc.go b/server/service/kv/kv_svc.go
index 3ed06ec..3a573d2 100644
--- a/server/service/kv/kv_svc.go
+++ b/server/service/kv/kv_svc.go
@@ -20,12 +20,14 @@ package kv
 import (
 	"context"
 	"fmt"
+	"strconv"
 	"time"
 
 	"github.com/apache/servicecomb-kie/pkg/common"
 	"github.com/apache/servicecomb-kie/pkg/concurrency"
 	"github.com/apache/servicecomb-kie/pkg/model"
 	"github.com/apache/servicecomb-kie/pkg/stringutil"
+	cfg "github.com/apache/servicecomb-kie/server/config"
 	"github.com/apache/servicecomb-kie/server/datasource"
 	"github.com/apache/servicecomb-kie/server/pubsub"
 	"github.com/go-chassis/cari/config"
@@ -111,10 +113,25 @@ func Create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, *errsvc.Error)
 		openlog.Error(err.Error())
 		return nil, config.NewError(config.ErrInternal, "create kv failed")
 	}
-	kv, err = datasource.GetBroker().GetKVDao().Create(ctx, kv)
-	if err != nil {
-		openlog.Error(fmt.Sprintf("post err:%s", err.Error()))
-		return nil, config.NewError(config.ErrInternal, "create kv failed")
+	// open synchronization needs to write tasks to db
+	if cfg.GetDB().SyncEnable {
+		task := &model.Task{
+			Action:    "create",
+			DataType:  "config",
+			Data:      kv,
+			Timestamp: strconv.FormatInt(time.Now().Unix(), 10),
+		}
+		kv, err = datasource.GetBroker().GetKVDao().CreateWithTask(ctx, kv, task)
+		if err != nil {
+			openlog.Error(fmt.Sprintf("post err:%s", err.Error()))
+			return nil, config.NewError(config.ErrInternal, "create kv failed")
+		}
+	} else {
+		kv, err = datasource.GetBroker().GetKVDao().Create(ctx, kv)
+		if err != nil {
+			openlog.Error(fmt.Sprintf("post err:%s", err.Error()))
+			return nil, config.NewError(config.ErrInternal, "create kv failed")
+		}
 	}
 	err = datasource.GetBroker().GetHistoryDao().AddHistory(ctx, kv)
 	if err != nil {
@@ -228,9 +245,22 @@ func Update(ctx context.Context, kv *model.UpdateKVRequest) (*model.KVDoc, error
 	if err != nil {
 		return nil, err
 	}
-	err = datasource.GetBroker().GetKVDao().Update(ctx, oldKV)
-	if err != nil {
-		return nil, err
+	if cfg.GetDB().SyncEnable {
+		task := &model.Task{
+			Action:    "update",
+			DataType:  "config",
+			Data:      kv,
+			Timestamp: strconv.FormatInt(time.Now().Unix(), 10),
+		}
+		err = datasource.GetBroker().GetKVDao().UpdateWithTask(ctx, oldKV, task)
+		if err != nil {
+			return nil, err
+		}
+	} else {
+		err = datasource.GetBroker().GetKVDao().Update(ctx, oldKV)
+		if err != nil {
+			return nil, err
+		}
 	}
 	openlog.Info(
 		fmt.Sprintf("update %s with labels %s value [%s]",