You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by GitBox <gi...@apache.org> on 2022/01/18 09:09:04 UTC

[GitHub] [servicecomb-service-center] aseTo2016 opened a new pull request #1229: #1228 sync model

aseTo2016 opened a new pull request #1229:
URL: https://github.com/apache/servicecomb-service-center/pull/1229


   1. event manager
   2. task manager
   3. replicator manager
   4. resource
   
   Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Make sure there is a [JIRA issue](https://issues.apache.org/jira/browse/SCB) filed for the change (usually before you start working on it).  Trivial changes like typos do not require a JIRA issue.  Your pull request should address just this issue, without pulling in other changes.
    - [ ] Each commit in the pull request should have a meaningful subject line and body.
    - [ ] Format the pull request title like `[SCB-XXX] Fixes bug in ApproximateQuantiles`, where you replace `SCB-XXX` with the appropriate JIRA issue.
    - [ ] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
    - [ ] Run `go build` `go test` `go fmt` `go vet` to make sure basic checks pass. A more thorough check will be performed on your pull request automatically.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
    - [ ] Never comment source code, delete it.
    - [ ] UT should has "context, subject, expected result" result as test case name, when you call t.Run().
   ---
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@servicecomb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] aseTo2016 commented on a change in pull request #1229: #1228 sync model

Posted by GitBox <gi...@apache.org>.
aseTo2016 commented on a change in pull request #1229:
URL: https://github.com/apache/servicecomb-service-center/pull/1229#discussion_r787496996



##########
File path: syncer/service/replicator/replicator_test.go
##########
@@ -0,0 +1,86 @@
+package replicator
+
+import (
+	"context"
+	"testing"
+
+	v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
+	"github.com/apache/servicecomb-service-center/syncer/service/replicator/resource"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func Test_replicatorManager_Persist(t *testing.T) {
+	ctx := context.TODO()
+	el := &v1sync.EventList{
+		Events: nil,
+	}
+	r := manager.Persist(ctx, el)
+	assert.Equal(t, 0, len(r))
+
+	resource.RegisterResources("fork", func(event *v1sync.Event) resource.Resource {
+		return &forkResources{
+			loadCurrentResourceResult: nil,
+			needOperateResult:         nil,
+			operateResult:             resource.SuccessResult(),
+		}
+	})
+
+	el = &v1sync.EventList{
+		Events: []*v1sync.Event{
+			{
+				Id:        "xxx1",
+				Action:    "",
+				Subject:   "fork",
+				Opts:      nil,
+				Value:     nil,
+				Timestamp: v1sync.Timestamp(),
+			},
+		},
+	}
+
+	r = manager.Persist(ctx, el)
+	if assert.Equal(t, 1, len(r)) {
+		assert.Equal(t, resource.SuccessResult().WithEventID("xxx1"), r[0])
+	}
+
+	el = &v1sync.EventList{
+		Events: []*v1sync.Event{
+			{
+				Id:        "xxx1",
+				Action:    "",
+				Subject:   "not exist",
+				Opts:      nil,
+				Value:     nil,
+				Timestamp: v1sync.Timestamp(),
+			},
+		},
+	}
+
+	r = manager.Persist(ctx, el)
+	if assert.Equal(t, 1, len(r)) {
+		assert.Equal(t, resource.Skip, r[0].Status)
+	}
+}
+
+type forkResources struct {

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@servicecomb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] tianxiaoliang commented on a change in pull request #1229: #1228 sync model

Posted by GitBox <gi...@apache.org>.
tianxiaoliang commented on a change in pull request #1229:
URL: https://github.com/apache/servicecomb-service-center/pull/1229#discussion_r787508021



##########
File path: syncer/service/event/manager.go
##########
@@ -0,0 +1,253 @@
+package event
+
+import (
+	"context"
+	"fmt"
+	"sort"
+	"sync"
+	"time"
+
+	"github.com/apache/servicecomb-service-center/pkg/log"
+	v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
+	"github.com/apache/servicecomb-service-center/syncer/service/replicator"
+	"github.com/apache/servicecomb-service-center/syncer/service/replicator/resource"
+
+	"github.com/go-chassis/foundation/gopool"
+)
+
+const (
+	defaultInternal = 500 * time.Millisecond
+)
+
+var m Manager
+
+type Event struct {
+	*v1sync.Event
+
+	Result chan<- *Result
+}
+
+type Result struct {
+	ID    string
+	Data  *v1sync.Result
+	Error error
+}
+
+func Work() {
+	m = NewManager()
+	m.HandleEvent()
+	m.HandleResult()
+}
+
+func GetManager() Manager {
+	return m
+}
+
+type ManagerOption func(*managerOptions)
+
+type managerOptions struct {
+	internal   time.Duration
+	replicator replicator.Replicator
+}
+
+func ManagerInternal(i time.Duration) ManagerOption {
+	return func(options *managerOptions) {
+		options.internal = i
+	}
+}
+
+func toManagerOptions(os ...ManagerOption) *managerOptions {
+	mo := new(managerOptions)
+	mo.internal = defaultInternal
+	mo.replicator = replicator.Manager()
+	for _, o := range os {
+		o(mo)
+	}
+
+	return mo
+}
+
+func Replicator(r replicator.Replicator) ManagerOption {
+	return func(options *managerOptions) {
+		options.replicator = r
+	}
+}
+
+func NewManager(os ...ManagerOption) Manager {
+	mo := toManagerOptions(os...)
+	em := &eventManager{
+		events:     make(chan *Event, 1000),
+		result:     make(chan *Result, 1000),
+		internal:   mo.internal,
+		replicator: mo.replicator,
+	}
+	return em
+}
+
+type Sender interface {

Review comment:
       接口级注释十分重要




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@servicecomb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] tianxiaoliang commented on a change in pull request #1229: #1228 sync model

Posted by GitBox <gi...@apache.org>.
tianxiaoliang commented on a change in pull request #1229:
URL: https://github.com/apache/servicecomb-service-center/pull/1229#discussion_r787361151



##########
File path: syncer/service/task/manager_test.go
##########
@@ -0,0 +1,66 @@
+package task
+
+import (
+	"context"
+	"testing"
+
+	"github.com/apache/servicecomb-service-center/syncer/service/event"
+
+	"github.com/go-chassis/cari/sync"
+	"github.com/stretchr/testify/assert"
+)
+
+func TestNewManager(t *testing.T) {
+	receiver := make(chan struct{}, 1)
+	fs := &forkSender{
+		events:  make(map[string]*event.Event),
+		receive: receiver,
+	}
+	ctx := context.TODO()
+	m := NewManager(
+		ManagerOperator(&forkOperator{
+			tasks: map[string]*sync.Task{
+				"xxx1": {
+					ID:           "xxx1",
+					ResourceType: "demo",
+					Action:       "create",
+					Timestamp:    0,
+					Status:       "pending",
+				},
+			},
+		}),
+		ManagerInternal(defaultInternal),
+		EventSender(fs))
+
+	m.LoadAndHandleTask(ctx)
+	m.UpdateResultTask(ctx)
+	<-receiver
+	assert.Equal(t, 1, len(fs.events))
+}
+
+type forkOperator struct {

Review comment:
       misspell fake




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@servicecomb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] tianxiaoliang commented on a change in pull request #1229: #1228 sync model

Posted by GitBox <gi...@apache.org>.
tianxiaoliang commented on a change in pull request #1229:
URL: https://github.com/apache/servicecomb-service-center/pull/1229#discussion_r787505061



##########
File path: server/handler/context/context.go
##########
@@ -32,6 +33,16 @@ const (
 	queryCacheOnly = "cacheOnly"
 )
 
+var enableSync string
+
+func init() {
+	enable := config.GetBool("sync.enableOnStart", false)
+	if enable {
+		enableSync = "1"

Review comment:
       魔鬼数字




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@servicecomb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] tianxiaoliang commented on a change in pull request #1229: #1228 sync model

Posted by GitBox <gi...@apache.org>.
tianxiaoliang commented on a change in pull request #1229:
URL: https://github.com/apache/servicecomb-service-center/pull/1229#discussion_r786594212



##########
File path: syncer/rpc/server.go
##########
@@ -33,16 +36,40 @@ const (
 	HealthStatusClose     = "CLOSE"
 )
 
+func NewServer() *Server {
+	return &Server{
+		replicator: replicator.Manager(),

Review comment:
       我还没往下看一层,但是replicator.Intance() 或者replicator.New()是不是更容易理解




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@servicecomb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] little-cui commented on a change in pull request #1229: #1228 sync model

Posted by GitBox <gi...@apache.org>.
little-cui commented on a change in pull request #1229:
URL: https://github.com/apache/servicecomb-service-center/pull/1229#discussion_r787416469



##########
File path: syncer/service/task/task.go
##########
@@ -0,0 +1,38 @@
+package task
+
+import (
+	"context"
+
+	"github.com/apache/servicecomb-service-center/eventbase/datasource"
+	"github.com/apache/servicecomb-service-center/pkg/log"
+	serverconfig "github.com/apache/servicecomb-service-center/server/config"
+
+	carisync "github.com/go-chassis/cari/sync"
+)
+
+func initDatabase() {
+	kind := serverconfig.GetString("registry.kind", "",
+		serverconfig.WithStandby("registry_plugin"))
+
+	if err := datasource.Init(kind); err != nil {
+		log.Fatal("init datasource failed", err)
+	}
+}
+
+func ListTask(ctx context.Context) ([]*carisync.Task, error) {
+	return datasource.GetTaskDao().List(ctx)

Review comment:
       使用eventbase的service

##########
File path: syncer/service/task/lock.go
##########
@@ -0,0 +1,72 @@
+package task
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"time"
+
+	datasourcedlock "github.com/apache/servicecomb-service-center/datasource/dlock"
+	"github.com/apache/servicecomb-service-center/pkg/log"
+	"github.com/apache/servicecomb-service-center/server/service/dlock"
+
+	"github.com/go-chassis/foundation/gopool"
+)
+
+type DistributedLock struct {
+	heartbeatDuration time.Duration
+	ttl               int64
+	do                func(ctx context.Context)
+	key               string
+	isLock            bool
+}
+
+func (dl *DistributedLock) LockDo() {
+	gopool.Go(func(goctx context.Context) {
+		log.Info("start lock key " + dl.key)
+		ticker := time.NewTicker(dl.heartbeatDuration)
+		var ctx context.Context
+		var cancel context.CancelFunc
+		failCount := 0
+		for {
+			select {
+			case <-ticker.C:
+				if !dl.isLock {
+					err := dlock.TryLock(dl.key, dl.ttl)
+					if err != nil {
+						continue
+					}
+
+					ctx, cancel = context.WithCancel(context.Background())
+					dl.do(ctx)
+					dl.isLock = true
+					continue
+				}
+
+				err := dlock.Renew(dl.key)

Review comment:
       使用cari的dlock

##########
File path: syncer/service/task/task.go
##########
@@ -0,0 +1,38 @@
+package task
+
+import (
+	"context"
+
+	"github.com/apache/servicecomb-service-center/eventbase/datasource"
+	"github.com/apache/servicecomb-service-center/pkg/log"
+	serverconfig "github.com/apache/servicecomb-service-center/server/config"
+
+	carisync "github.com/go-chassis/cari/sync"
+)
+
+func initDatabase() {
+	kind := serverconfig.GetString("registry.kind", "",
+		serverconfig.WithStandby("registry_plugin"))
+
+	if err := datasource.Init(kind); err != nil {

Review comment:
       使用cari的db,去servicecenter的config依赖

##########
File path: syncer/service/replicator/replicator_test.go
##########
@@ -0,0 +1,86 @@
+package replicator
+
+import (
+	"context"
+	"testing"
+
+	v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
+	"github.com/apache/servicecomb-service-center/syncer/service/replicator/resource"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func Test_replicatorManager_Persist(t *testing.T) {
+	ctx := context.TODO()
+	el := &v1sync.EventList{
+		Events: nil,
+	}
+	r := manager.Persist(ctx, el)
+	assert.Equal(t, 0, len(r))
+
+	resource.RegisterResources("fork", func(event *v1sync.Event) resource.Resource {
+		return &forkResources{
+			loadCurrentResourceResult: nil,
+			needOperateResult:         nil,
+			operateResult:             resource.SuccessResult(),
+		}
+	})
+
+	el = &v1sync.EventList{
+		Events: []*v1sync.Event{
+			{
+				Id:        "xxx1",
+				Action:    "",
+				Subject:   "fork",
+				Opts:      nil,
+				Value:     nil,
+				Timestamp: v1sync.Timestamp(),
+			},
+		},
+	}
+
+	r = manager.Persist(ctx, el)
+	if assert.Equal(t, 1, len(r)) {
+		assert.Equal(t, resource.SuccessResult().WithEventID("xxx1"), r[0])
+	}
+
+	el = &v1sync.EventList{
+		Events: []*v1sync.Event{
+			{
+				Id:        "xxx1",
+				Action:    "",
+				Subject:   "not exist",
+				Opts:      nil,
+				Value:     nil,
+				Timestamp: v1sync.Timestamp(),
+			},
+		},
+	}
+
+	r = manager.Persist(ctx, el)
+	if assert.Equal(t, 1, len(r)) {
+		assert.Equal(t, resource.Skip, r[0].Status)
+	}
+}
+
+type forkResources struct {

Review comment:
       fake?

##########
File path: syncer/service/replicator/resource/kv.go
##########
@@ -0,0 +1,170 @@
+package resource
+
+import (
+	"context"
+	"errors"
+	"sync"
+
+	svcconfig "github.com/apache/servicecomb-service-center/server/config"
+	v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
+
+	"github.com/little-cui/etcdadpt"
+)
+
+const (
+	KV = "kv"
+)
+
+const (
+	KVKey         = "key"
+	KVKeyNonExist = "key not exist in opts"
+)
+
+var (
+	manager KeyManager
+
+	ErrNotImplement   = errors.New("not implement")
+	ErrRecordNonExist = errors.New("record non exist")
+)
+
+func NewKV(e *v1sync.Event) Resource {
+	r := &kv{
+		event:   e,
+		manager: keyManage(),
+	}
+	return r
+}
+
+type kv struct {
+	event *v1sync.Event
+	key   string
+
+	manager         KeyManager
+	tombstoneLoader tombstoneLoader
+
+	cur []byte
+
+	defaultFailHandler
+}
+
+func (k *kv) LoadCurrentResource(ctx context.Context) *Result {
+	key, ok := k.event.Opts[KVKey]
+	if !ok {
+		return NewResult(Fail, KVKeyNonExist)
+	}
+	k.key = key
+
+	value, err := k.manager.Get(ctx, key)
+	if err != nil {
+		if errors.Is(err, ErrRecordNonExist) {
+			return nil
+		}
+		return FailResult(err)
+	}
+	k.cur = value
+	return nil
+}
+
+func (k *kv) NeedOperate(ctx context.Context) *Result {
+	c := &checker{
+		curNotNil:  k.cur != nil,
+		event:      k.event,
+		updateTime: nil,
+		resourceID: k.key,
+	}
+	c.tombstoneLoader = c
+	if k.tombstoneLoader != nil {
+		c.tombstoneLoader = k.tombstoneLoader
+	}
+
+	return c.needOperate(ctx)
+}
+
+func (k *kv) CreateHandle(ctx context.Context) error {
+	return k.manager.Post(ctx, k.key, k.event.Value)
+}
+
+func (k *kv) UpdateHandle(ctx context.Context) error {
+	return k.manager.Put(ctx, k.key, k.event.Value)
+}
+
+func (k *kv) DeleteHandle(ctx context.Context) error {
+	return k.manager.Delete(ctx, k.key)
+}
+
+var once sync.Once
+
+func keyManage() KeyManager {
+	once.Do(InitManager)
+	return manager
+}
+
+func (k *kv) Operate(ctx context.Context) *Result {
+	return newOperator(k).operate(ctx, k.event.Action)
+}
+
+type KeyManager interface {
+	Get(ctx context.Context, key string) ([]byte, error)
+	Put(ctx context.Context, key string, value []byte) error
+	Post(ctx context.Context, key string, value []byte) error
+	Delete(ctx context.Context, key string) error
+}
+
+type mongoManager struct {
+}
+
+func (m *mongoManager) Get(_ context.Context, _ string) ([]byte, error) {
+	return nil, ErrNotImplement
+}
+
+func (m *mongoManager) Put(_ context.Context, _ string, _ []byte) error {
+	return ErrNotImplement
+}
+
+func (m *mongoManager) Post(_ context.Context, _ string, _ []byte) error {
+	return ErrNotImplement
+}
+
+func (m *mongoManager) Delete(_ context.Context, _ string) error {
+	return ErrNotImplement
+}
+
+type etcdManager struct {
+}
+
+func InitManager() {
+	kind := svcconfig.GetString("registry.kind", "",
+		svcconfig.WithStandby("registry_plugin"))
+	if kind == "etcd" {
+		manager = new(etcdManager)
+		return
+	}
+
+	manager = new(mongoManager)

Review comment:
       切cari的db模块后,不需要执行init,去servicecenter的config依赖




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@servicecomb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] little-cui commented on a change in pull request #1229: #1228 sync model

Posted by GitBox <gi...@apache.org>.
little-cui commented on a change in pull request #1229:
URL: https://github.com/apache/servicecomb-service-center/pull/1229#discussion_r787304011



##########
File path: syncer/service/event/manager.go
##########
@@ -0,0 +1,253 @@
+package event
+
+import (
+	"context"
+	"fmt"
+	"sort"
+	"sync"
+	"time"
+
+	"github.com/apache/servicecomb-service-center/pkg/log"
+	v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
+	"github.com/apache/servicecomb-service-center/syncer/service/replicator"
+	"github.com/apache/servicecomb-service-center/syncer/service/replicator/resource"
+
+	"github.com/go-chassis/foundation/gopool"
+)
+
+const (
+	defaultInternal = 500 * time.Microsecond

Review comment:
       应该是毫秒单位




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@servicecomb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] tianxiaoliang merged pull request #1229: #1228 sync model

Posted by GitBox <gi...@apache.org>.
tianxiaoliang merged pull request #1229:
URL: https://github.com/apache/servicecomb-service-center/pull/1229


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@servicecomb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] tianxiaoliang commented on a change in pull request #1229: #1228 sync model

Posted by GitBox <gi...@apache.org>.
tianxiaoliang commented on a change in pull request #1229:
URL: https://github.com/apache/servicecomb-service-center/pull/1229#discussion_r786591585



##########
File path: datasource/manager.go
##########
@@ -66,6 +67,7 @@ func Init(opts Options) error {
 		return err
 	}
 
+	EnableSync = config.GetBool("syncer.enabled", false)

Review comment:
       先放到runtime包下,运行时的状态不能分散管理

##########
File path: client/set.go
##########
@@ -1,7 +1,7 @@
 package client
 
 import (
-	v1sync "github.com/apache/servicecomb-service-center/api/sync/v1"
+	v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"

Review comment:
       syncer并没有拆库的打算,而是打算长期采用mono repo的管理方式,syncer的api,都扔到代码库根目录下统一管理才是对的




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@servicecomb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] aseTo2016 commented on a change in pull request #1229: #1228 sync model

Posted by GitBox <gi...@apache.org>.
aseTo2016 commented on a change in pull request #1229:
URL: https://github.com/apache/servicecomb-service-center/pull/1229#discussion_r787498282



##########
File path: syncer/rpc/server.go
##########
@@ -33,16 +36,40 @@ const (
 	HealthStatusClose     = "CLOSE"
 )
 
+func NewServer() *Server {
+	return &Server{
+		replicator: replicator.Manager(),

Review comment:
       我这边的设计的接口都叫manager,所以获取其实例就叫Manager了




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@servicecomb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] tianxiaoliang commented on a change in pull request #1229: #1228 sync model

Posted by GitBox <gi...@apache.org>.
tianxiaoliang commented on a change in pull request #1229:
URL: https://github.com/apache/servicecomb-service-center/pull/1229#discussion_r787507247



##########
File path: syncer/service/event/manager.go
##########
@@ -0,0 +1,253 @@
+package event
+
+import (
+	"context"
+	"fmt"
+	"sort"
+	"sync"
+	"time"
+
+	"github.com/apache/servicecomb-service-center/pkg/log"
+	v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
+	"github.com/apache/servicecomb-service-center/syncer/service/replicator"
+	"github.com/apache/servicecomb-service-center/syncer/service/replicator/resource"
+
+	"github.com/go-chassis/foundation/gopool"
+)
+
+const (
+	defaultInternal = 500 * time.Millisecond
+)
+
+var m Manager
+
+type Event struct {
+	*v1sync.Event
+
+	Result chan<- *Result
+}
+
+type Result struct {
+	ID    string
+	Data  *v1sync.Result
+	Error error
+}
+
+func Work() {
+	m = NewManager()
+	m.HandleEvent()
+	m.HandleResult()
+}
+
+func GetManager() Manager {
+	return m
+}
+
+type ManagerOption func(*managerOptions)
+
+type managerOptions struct {
+	internal   time.Duration
+	replicator replicator.Replicator
+}
+
+func ManagerInternal(i time.Duration) ManagerOption {
+	return func(options *managerOptions) {
+		options.internal = i
+	}
+}
+
+func toManagerOptions(os ...ManagerOption) *managerOptions {
+	mo := new(managerOptions)
+	mo.internal = defaultInternal
+	mo.replicator = replicator.Manager()
+	for _, o := range os {
+		o(mo)
+	}
+
+	return mo
+}
+
+func Replicator(r replicator.Replicator) ManagerOption {
+	return func(options *managerOptions) {
+		options.replicator = r
+	}
+}
+
+func NewManager(os ...ManagerOption) Manager {
+	mo := toManagerOptions(os...)
+	em := &eventManager{
+		events:     make(chan *Event, 1000),
+		result:     make(chan *Result, 1000),
+		internal:   mo.internal,
+		replicator: mo.replicator,
+	}
+	return em
+}
+
+type Sender interface {
+	Send(et *Event)
+}
+
+type Manager interface {
+	Sender
+
+	HandleEvent()
+	HandleResult()
+}
+
+type eventManager struct {
+	events chan *Event
+
+	internal time.Duration
+	ticker   *time.Ticker
+
+	cache  sync.Map
+	result chan *Result
+
+	replicator replicator.Replicator
+}
+
+func (e *eventManager) Send(et *Event) {
+	if et.Result == nil {
+		et.Result = e.result
+		e.cache.Store(et.Id, et)
+	}
+
+	e.events <- et
+}
+
+func (e *eventManager) HandleResult() {
+	gopool.Go(func(ctx context.Context) {
+		e.resultHandle(ctx)
+	})
+}
+
+func (e *eventManager) resultHandle(ctx context.Context) {
+	for {
+		select {
+		case res, ok := <-e.result:
+			if !ok {
+				continue
+			}
+			if res.Error != nil {
+				log.Error("result is error ", res.Error)
+				continue
+			}
+
+			id := res.ID
+			et, ok := e.cache.LoadAndDelete(id)
+			if !ok {
+				log.Warn(fmt.Sprintf("%s event not exist", id))
+				continue
+			}
+
+			r, result := resource.New(et.(*Event).Event)
+			if result != nil {
+				log.Warn(fmt.Sprintf("new resource failed, %s", result.Message))
+				continue
+			}
+
+			toSendEvent, err := r.FailHandle(ctx, res.Data.Code)
+			if err != nil {
+				continue
+			}
+			if toSendEvent != nil {
+				e.Send(&Event{
+					Event: toSendEvent,
+				})
+			}
+		case <-ctx.Done():
+			log.Info("result handle worker is closed")
+			return
+		}
+	}
+}
+
+func (e *eventManager) Close() {
+	e.ticker.Stop()
+	close(e.result)
+}
+
+type syncEvents []*Event
+
+func (s syncEvents) Len() int {
+	return len(s)
+}
+
+func (s syncEvents) Less(i, j int) bool {
+	return s[i].Timestamp < s[j].Timestamp
+}
+
+func (s syncEvents) Swap(i, j int) {
+	s[i], s[j] = s[j], s[i]
+}
+
+func (e *eventManager) HandleEvent() {
+	gopool.Go(func(ctx context.Context) {
+		e.handleEvent(ctx)
+	})
+}
+
+func (e *eventManager) handleEvent(ctx context.Context) {
+	events := make([]*Event, 0, 100)
+	e.ticker = time.NewTicker(e.internal)
+	for {
+		select {
+		case <-e.ticker.C:
+			if len(events) == 0 {
+				continue
+			}
+			send := events[:]
+
+			events = make([]*Event, 0, 100)
+			go e.handle(ctx, send)
+		case event, ok := <-e.events:
+			if !ok {
+				return
+			}
+
+			events = append(events, event)
+			if len(events) > 50 {
+				send := events[:]
+				events = make([]*Event, 0, 100)
+				go e.handle(ctx, send)
+			}
+		case <-ctx.Done():
+			e.Close()
+			return
+		}
+	}
+}
+
+func (e *eventManager) handle(ctx context.Context, es syncEvents) {
+	sort.Sort(es)
+
+	sendEvents := make([]*v1sync.Event, 0, len(es))
+	for _, event := range es {
+		sendEvents = append(sendEvents, event.Event)
+	}
+
+	result, err := e.replicator.Replicate(ctx, &v1sync.EventList{
+		Events: sendEvents,
+	})
+
+	if err != nil {
+		log.Error("replicate failed", err)
+		result = &v1sync.Results{
+			Results: make(map[string]*v1sync.Result),
+		}
+	}
+
+	for _, e := range es {
+		e.Result <- &Result{
+			ID:    e.Id,
+			Data:  result.Results[e.Id],
+			Error: err,
+		}
+	}
+}
+
+func Send(e *Event) {

Review comment:
       后面要多补充函数级注释




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@servicecomb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] aseTo2016 commented on a change in pull request #1229: #1228 sync model

Posted by GitBox <gi...@apache.org>.
aseTo2016 commented on a change in pull request #1229:
URL: https://github.com/apache/servicecomb-service-center/pull/1229#discussion_r787496561



##########
File path: client/set.go
##########
@@ -1,7 +1,7 @@
 package client
 
 import (
-	v1sync "github.com/apache/servicecomb-service-center/api/sync/v1"
+	v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"

Review comment:
       现在其它资源都放在syncer下面了,所以我把api的这一部分也迁移到syncer下面去了




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@servicecomb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] tianxiaoliang commented on a change in pull request #1229: #1228 sync model

Posted by GitBox <gi...@apache.org>.
tianxiaoliang commented on a change in pull request #1229:
URL: https://github.com/apache/servicecomb-service-center/pull/1229#discussion_r786592698



##########
File path: syncer/api/v1/event_service.proto
##########
@@ -0,0 +1,39 @@
+syntax = "proto3";
+
+package api.sync.v1;
+
+option go_package = "github.com/apache/servicecomb-service-center/api/sync/v1;v1";
+
+message EventList {
+  repeated Event events = 1;
+}
+

Review comment:
       挪到上层吧




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@servicecomb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] aseTo2016 commented on a change in pull request #1229: #1228 sync model

Posted by GitBox <gi...@apache.org>.
aseTo2016 commented on a change in pull request #1229:
URL: https://github.com/apache/servicecomb-service-center/pull/1229#discussion_r787384448



##########
File path: syncer/api/v1/event_service.proto
##########
@@ -0,0 +1,39 @@
+syntax = "proto3";
+
+package api.sync.v1;
+
+option go_package = "github.com/apache/servicecomb-service-center/api/sync/v1;v1";
+
+message EventList {
+  repeated Event events = 1;
+}
+

Review comment:
       接口的定义文件,有多个事件,所以需要返回list




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@servicecomb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] tianxiaoliang commented on a change in pull request #1229: #1228 sync model

Posted by GitBox <gi...@apache.org>.
tianxiaoliang commented on a change in pull request #1229:
URL: https://github.com/apache/servicecomb-service-center/pull/1229#discussion_r787509012



##########
File path: syncer/service/task/manager_test.go
##########
@@ -0,0 +1,66 @@
+package task
+
+import (
+	"context"
+	"testing"
+
+	"github.com/apache/servicecomb-service-center/syncer/service/event"
+
+	"github.com/go-chassis/cari/sync"
+	"github.com/stretchr/testify/assert"
+)
+
+func TestNewManager(t *testing.T) {
+	receiver := make(chan struct{}, 1)
+	fs := &forkSender{
+		events:  make(map[string]*event.Event),
+		receive: receiver,
+	}
+	ctx := context.TODO()
+	m := NewManager(
+		ManagerOperator(&mockOperator{
+			tasks: map[string]*sync.Task{
+				"xxx1": {
+					ID:           "xxx1",
+					ResourceType: "demo",
+					Action:       "create",
+					Timestamp:    0,
+					Status:       "pending",
+				},
+			},
+		}),
+		ManagerInternal(defaultInternal),
+		EventSender(fs))
+
+	m.LoadAndHandleTask(ctx)
+	m.UpdateResultTask(ctx)
+	<-receiver
+	assert.Equal(t, 1, len(fs.events))
+}
+
+type mockOperator struct {
+	tasks map[string]*sync.Task
+}
+
+func (f *mockOperator) ListTasks(_ context.Context) ([]*sync.Task, error) {
+	result := make([]*sync.Task, 0, len(f.tasks))
+	for _, task := range f.tasks {
+		result = append(result, task)
+	}
+	return result, nil
+}
+
+func (f *mockOperator) DeleteTask(_ context.Context, t *sync.Task) error {
+	delete(f.tasks, t.ID)
+	return nil
+}
+
+type forkSender struct {
+	events  map[string]*event.Event
+	receive chan struct{}
+}
+
+func (f *forkSender) Send(et *event.Event) {

Review comment:
       fake




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@servicecomb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org