You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2020/12/14 01:49:31 UTC
[servicecomb-service-center] branch master updated: Syncer
optimization: replace slice with channel in the implemention of queue
(#773)
This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 948ba02 Syncer optimization: replace slice with channel in the implemention of queue (#773)
948ba02 is described below
commit 948ba0267b70eb0aa9f41b8e413c876098a26afd
Author: lilai23 <46...@users.noreply.github.com>
AuthorDate: Mon Dec 14 09:49:21 2020 +0800
Syncer optimization: replace slice with channel in the implemention of queue (#773)
* Syncer optimization: replace slice with channel in the implemention of queue
* - 确保预声明数据量与实际同步相同
- 更新文档
* 更新UT
* update readme
---
datasource/etcd/event/instance_event_handler.go | 1 -
pkg/dump/dump.go | 1 -
server/syncernotify/websocket_test.go | 1 -
syncer/README-ZH.md | 9 +-
syncer/README.md | 14 +-
syncer/client/sync_client.go | 8 +-
syncer/client/sync_client_test.go | 6 +-
syncer/pkg/utils/error.go | 1 +
syncer/proto/syncer.pb.go | 252 ++++++++++--------------
syncer/proto/syncer.proto | 2 +
syncer/server/handler.go | 72 +------
syncer/server/handler_test.go | 126 ++----------
syncer/server/server.go | 144 +++++---------
13 files changed, 216 insertions(+), 421 deletions(-)
diff --git a/datasource/etcd/event/instance_event_handler.go b/datasource/etcd/event/instance_event_handler.go
index dfe2333..ae5919a 100644
--- a/datasource/etcd/event/instance_event_handler.go
+++ b/datasource/etcd/event/instance_event_handler.go
@@ -168,7 +168,6 @@ func NotifySyncerInstanceEvent(evt sd.KvEvent, domainProject string, ms *pb.Micr
Action: string(evt.Type),
Service: service,
Instance: instance,
- Revision: evt.Revision,
}
syncernotify.GetSyncerNotifyCenter().AddEvent(instEvent)
diff --git a/pkg/dump/dump.go b/pkg/dump/dump.go
index d4cc8e3..20e462e 100644
--- a/pkg/dump/dump.go
+++ b/pkg/dump/dump.go
@@ -227,5 +227,4 @@ type WatchInstanceChangedEvent struct {
Action string `protobuf:"bytes,1,opt,name=action" json:"action,omitempty"`
Service *Microservice `protobuf:"bytes,2,opt,name=service" json:"service,omitempty"`
Instance *Instance `protobuf:"bytes,3,opt,name=instance" json:"instance,omitempty"`
- Revision int64 `protobuf:"bytes,4,opt,name=revision" json:"revision,omitempty"`
}
diff --git a/server/syncernotify/websocket_test.go b/server/syncernotify/websocket_test.go
index be9f927..9dd77d6 100644
--- a/server/syncernotify/websocket_test.go
+++ b/server/syncernotify/websocket_test.go
@@ -117,7 +117,6 @@ func TestDoWebSocketWatch(t *testing.T) {
HostName: "sunlisen",
},
},
- Revision: 10,
}
GetSyncerNotifyCenter().AddEvent(instanceEvent)
diff --git a/syncer/README-ZH.md b/syncer/README-ZH.md
index dc99363..80dabf0 100644
--- a/syncer/README-ZH.md
+++ b/syncer/README-ZH.md
@@ -57,6 +57,12 @@ $ go build
- cluster-port: 当mode为“cluster”时,Syncer集群成员之间进行通信的端口
- node:当mode为“cluster”时,syncer集群成员名称。
+###### 同步模式说明
+- 增量同步:默认的同步模式。
+- 全量同步:每个syncer启动加入时自动触发一次,也可以在增量同步异常告警之后手动调用接口触发。
+```bash
+$ curl http://localhost:30300/v1/syncer/full-synchronization
+```
假设有2个服务中心,每个服务中心都有一个用于微服务发现和注册的服务中心集群,如下所示:
@@ -110,4 +116,5 @@ Syncer是一个开发中版本,在下面列出已支持的特性,更多开
- 支持多个servicecomb-service-center 服务中心之间进行数据同步
- 在etcd中固化存储微服务实例映射表
-- 支持集群模式部署Syncer,每个syncer集群拥有3个实例
\ No newline at end of file
+- 支持集群模式部署Syncer,每个syncer集群拥有3个实例
+- 支持增量同步为主要同步机制,每个syncer加入时触发一次全量同步,增量同步异常告警后也可手动进行全量同步
\ No newline at end of file
diff --git a/syncer/README.md b/syncer/README.md
index ff34a5b..1d94e88 100644
--- a/syncer/README.md
+++ b/syncer/README.md
@@ -80,8 +80,19 @@ $ go build
Member name of Syncer cluster when mode is set to be "cluster".
+###### Synchronization Mode Description
+- Incremental Synchronization:
+
+ Default synchronization mode.
+- Full Synchronization:
+ A full synchronization is automatically triggered when each syncer starts to join,
+ or it can be triggered manually by calling the interface after an exception occurs in the incremental synchronization.
+
+```bash
+$ curl http://localhost:30300/v1/syncer/full-synchronization
+```
Suppose there are 2 Service centers, each of them with a Service-center cluster for microservices discovery and registry, as following,
@@ -136,4 +147,5 @@ Syncer is in developing progress, reference to [TODO](./TODO.md) to get more dev
- Data synchronization among multiple servicecomb-service-centers
- Solidify the mapping table of micro-service instances into etcd
- Support Syncer cluster mode, each Syncer has 3 instances
-
+- Support incremental synchronization as the main synchronization mechanism.
+ When each syncer joins, a full synchronization is triggered. It can also be performed manually after an exception occurs in incremental synchronization.
diff --git a/syncer/client/sync_client.go b/syncer/client/sync_client.go
index ae71d5d..a425e2f 100644
--- a/syncer/client/sync_client.go
+++ b/syncer/client/sync_client.go
@@ -61,8 +61,8 @@ func NewSyncClient(addr string, tlsConf *tls.Config) (cli *Client) {
}
// Pull implement the interface of sync server
-func (c *Client) Pull(ctx context.Context) (*pb.SyncData, error) {
- data, err := c.cli.Pull(ctx, &pb.PullRequest{})
+func (c *Client) Pull(ctx context.Context, addr string) (*pb.SyncData, error) {
+ data, err := c.cli.Pull(ctx, &pb.PullRequest{Addr: addr})
if err != nil {
log.Errorf(err, "Pull from grpc client failed, going to close the client")
closeClient(c.addr)
@@ -70,8 +70,8 @@ func (c *Client) Pull(ctx context.Context) (*pb.SyncData, error) {
return data, err
}
-func (c *Client) IncrementPull(ctx context.Context, addr string) (*pb.SyncData, error) {
- data, err := c.cli.IncrementPull(ctx, &pb.IncrementPullRequest{Addr: addr})
+func (c *Client) IncrementPull(ctx context.Context, req *pb.IncrementPullRequest) (*pb.SyncData, error) {
+ data, err := c.cli.IncrementPull(ctx, req)
if err != nil {
log.Error("Pull from grpc client failed, going to close the client", err)
closeClient(c.addr)
diff --git a/syncer/client/sync_client_test.go b/syncer/client/sync_client_test.go
index f666c7b..1317687 100644
--- a/syncer/client/sync_client_test.go
+++ b/syncer/client/sync_client_test.go
@@ -15,18 +15,18 @@ var c = NewSyncClient("", new(tls.Config))
func TestClient_IncrementPull(t *testing.T) {
t.Run("Test IncrementPull", func(t *testing.T) {
- _, err := c.IncrementPull(context.Background(), "http://127.0.0.1")
+ _, err := c.IncrementPull(context.Background(), &pb.IncrementPullRequest{Addr: "http://127.0.0.1", Length: 3})
assert.Error(t, err, "IncrementPull fail without grpc")
})
t.Run("Test IncrementPull", func(t *testing.T) {
defer monkey.UnpatchAll()
monkey.PatchInstanceMethod(reflect.TypeOf((*Client)(nil)),
- "IncrementPull", func(client *Client, ctx context.Context, string2 string) (*pb.SyncData, error) {
+ "IncrementPull", func(client *Client, ctx context.Context, req *pb.IncrementPullRequest) (*pb.SyncData, error) {
return syncDataCreate(), nil
})
- syncData, err := c.IncrementPull(context.Background(), "http://127.0.0.1")
+ syncData, err := c.IncrementPull(context.Background(), &pb.IncrementPullRequest{Addr: "http://127.0.0.1", Length: 3})
assert.NoError(t, err, "IncrementPull no err when client exist")
assert.NotNil(t, syncData, "syncData not nil when client exist")
})
diff --git a/syncer/pkg/utils/error.go b/syncer/pkg/utils/error.go
index 2c06673..19e922f 100644
--- a/syncer/pkg/utils/error.go
+++ b/syncer/pkg/utils/error.go
@@ -26,4 +26,5 @@ var (
ErrActionInvalid = errors.New("action invalid")
ErrMappingSearch = errors.New("mapping does not exist")
ErrInstanceDelete = errors.New("delete instance failed")
+ ErrChannelSearch = errors.New("channel does not exist")
)
diff --git a/syncer/proto/syncer.pb.go b/syncer/proto/syncer.pb.go
index 72822ea..a0919c5 100644
--- a/syncer/proto/syncer.pb.go
+++ b/syncer/proto/syncer.pb.go
@@ -23,12 +23,12 @@ import (
fmt "fmt"
math "math"
- proto1 "github.com/golang/protobuf/proto"
+ proto "github.com/golang/protobuf/proto"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
-var _ = proto1.Marshal
+var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
@@ -36,7 +36,7 @@ var _ = math.Inf
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
-const _ = proto1.ProtoPackageIsVersion3 // please upgrade the proto package
+const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type SyncService_Status int32
@@ -59,7 +59,7 @@ var SyncService_Status_value = map[string]int32{
}
func (x SyncService_Status) String() string {
- return proto1.EnumName(SyncService_Status_name, int32(x))
+ return proto.EnumName(SyncService_Status_name, int32(x))
}
func (SyncService_Status) EnumDescriptor() ([]byte, []int) {
@@ -93,7 +93,7 @@ var SyncInstance_Status_value = map[string]int32{
}
func (x SyncInstance_Status) String() string {
- return proto1.EnumName(SyncInstance_Status_name, int32(x))
+ return proto.EnumName(SyncInstance_Status_name, int32(x))
}
func (SyncInstance_Status) EnumDescriptor() ([]byte, []int) {
@@ -121,7 +121,7 @@ var HealthCheck_Modes_value = map[string]int32{
}
func (x HealthCheck_Modes) String() string {
- return proto1.EnumName(HealthCheck_Modes_name, int32(x))
+ return proto.EnumName(HealthCheck_Modes_name, int32(x))
}
func (HealthCheck_Modes) EnumDescriptor() ([]byte, []int) {
@@ -132,10 +132,11 @@ type PullRequest struct {
ServiceName string `protobuf:"bytes,1,opt,name=serviceName,proto3" json:"serviceName,omitempty"`
Options string `protobuf:"bytes,2,opt,name=options,proto3" json:"options,omitempty"`
Time string `protobuf:"bytes,3,opt,name=time,proto3" json:"time,omitempty"`
+ Addr string `protobuf:"bytes,4,opt,name=addr,proto3" json:"addr,omitempty"`
}
func (m *PullRequest) Reset() { *m = PullRequest{} }
-func (m *PullRequest) String() string { return proto1.CompactTextString(m) }
+func (m *PullRequest) String() string { return proto.CompactTextString(m) }
func (*PullRequest) ProtoMessage() {}
func (*PullRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_9577b640f2aab197, []int{0}
@@ -162,12 +163,20 @@ func (m *PullRequest) GetTime() string {
return ""
}
+func (m *PullRequest) GetAddr() string {
+ if m != nil {
+ return m.Addr
+ }
+ return ""
+}
+
type IncrementPullRequest struct {
- Addr string `protobuf:"bytes,1,opt,name=addr,proto3" json:"addr,omitempty"`
+ Addr string `protobuf:"bytes,1,opt,name=addr,proto3" json:"addr,omitempty"`
+ Length int64 `protobuf:"varint,2,opt,name=length,proto3" json:"length,omitempty"`
}
func (m *IncrementPullRequest) Reset() { *m = IncrementPullRequest{} }
-func (m *IncrementPullRequest) String() string { return proto1.CompactTextString(m) }
+func (m *IncrementPullRequest) String() string { return proto.CompactTextString(m) }
func (*IncrementPullRequest) ProtoMessage() {}
func (*IncrementPullRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_9577b640f2aab197, []int{1}
@@ -180,12 +189,19 @@ func (m *IncrementPullRequest) GetAddr() string {
return ""
}
+func (m *IncrementPullRequest) GetLength() int64 {
+ if m != nil {
+ return m.Length
+ }
+ return 0
+}
+
type DeclareRequest struct {
Addr string `protobuf:"bytes,1,opt,name=addr,proto3" json:"addr,omitempty"`
}
func (m *DeclareRequest) Reset() { *m = DeclareRequest{} }
-func (m *DeclareRequest) String() string { return proto1.CompactTextString(m) }
+func (m *DeclareRequest) String() string { return proto.CompactTextString(m) }
func (*DeclareRequest) ProtoMessage() {}
func (*DeclareRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_9577b640f2aab197, []int{2}
@@ -203,7 +219,7 @@ type DeclareResponse struct {
}
func (m *DeclareResponse) Reset() { *m = DeclareResponse{} }
-func (m *DeclareResponse) String() string { return proto1.CompactTextString(m) }
+func (m *DeclareResponse) String() string { return proto.CompactTextString(m) }
func (*DeclareResponse) ProtoMessage() {}
func (*DeclareResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_9577b640f2aab197, []int{3}
@@ -222,7 +238,7 @@ type SyncData struct {
}
func (m *SyncData) Reset() { *m = SyncData{} }
-func (m *SyncData) String() string { return proto1.CompactTextString(m) }
+func (m *SyncData) String() string { return proto.CompactTextString(m) }
func (*SyncData) ProtoMessage() {}
func (*SyncData) Descriptor() ([]byte, []int) {
return fileDescriptor_9577b640f2aab197, []int{4}
@@ -247,7 +263,7 @@ type SyncService struct {
App string `protobuf:"bytes,2,opt,name=app,proto3" json:"app,omitempty"`
Name string `protobuf:"bytes,3,opt,name=name,proto3" json:"name,omitempty"`
Version string `protobuf:"bytes,4,opt,name=version,proto3" json:"version,omitempty"`
- Status SyncService_Status `protobuf:"varint,5,opt,name=status,proto3,enum=proto1.SyncService_Status" json:"status,omitempty"`
+ Status SyncService_Status `protobuf:"varint,5,opt,name=status,proto3,enum=proto.SyncService_Status" json:"status,omitempty"`
DomainProject string `protobuf:"bytes,6,opt,name=domainProject,proto3" json:"domainProject,omitempty"`
Environment string `protobuf:"bytes,7,opt,name=environment,proto3" json:"environment,omitempty"`
PluginName string `protobuf:"bytes,8,opt,name=pluginName,proto3" json:"pluginName,omitempty"`
@@ -255,7 +271,7 @@ type SyncService struct {
}
func (m *SyncService) Reset() { *m = SyncService{} }
-func (m *SyncService) String() string { return proto1.CompactTextString(m) }
+func (m *SyncService) String() string { return proto.CompactTextString(m) }
func (*SyncService) ProtoMessage() {}
func (*SyncService) Descriptor() ([]byte, []int) {
return fileDescriptor_9577b640f2aab197, []int{5}
@@ -329,7 +345,7 @@ type SyncInstance struct {
ServiceId string `protobuf:"bytes,2,opt,name=serviceId,proto3" json:"serviceId,omitempty"`
Endpoints []string `protobuf:"bytes,3,rep,name=endpoints,proto3" json:"endpoints,omitempty"`
HostName string `protobuf:"bytes,4,opt,name=hostName,proto3" json:"hostName,omitempty"`
- Status SyncInstance_Status `protobuf:"varint,5,opt,name=status,proto3,enum=proto1.SyncInstance_Status" json:"status,omitempty"`
+ Status SyncInstance_Status `protobuf:"varint,5,opt,name=status,proto3,enum=proto.SyncInstance_Status" json:"status,omitempty"`
HealthCheck *HealthCheck `protobuf:"bytes,6,opt,name=healthCheck,proto3" json:"healthCheck,omitempty"`
Version string `protobuf:"bytes,7,opt,name=version,proto3" json:"version,omitempty"`
PluginName string `protobuf:"bytes,8,opt,name=pluginName,proto3" json:"pluginName,omitempty"`
@@ -337,7 +353,7 @@ type SyncInstance struct {
}
func (m *SyncInstance) Reset() { *m = SyncInstance{} }
-func (m *SyncInstance) String() string { return proto1.CompactTextString(m) }
+func (m *SyncInstance) String() string { return proto.CompactTextString(m) }
func (*SyncInstance) ProtoMessage() {}
func (*SyncInstance) Descriptor() ([]byte, []int) {
return fileDescriptor_9577b640f2aab197, []int{6}
@@ -413,7 +429,7 @@ type Expansion struct {
}
func (m *Expansion) Reset() { *m = Expansion{} }
-func (m *Expansion) String() string { return proto1.CompactTextString(m) }
+func (m *Expansion) String() string { return proto.CompactTextString(m) }
func (*Expansion) ProtoMessage() {}
func (*Expansion) Descriptor() ([]byte, []int) {
return fileDescriptor_9577b640f2aab197, []int{7}
@@ -441,7 +457,7 @@ func (m *Expansion) GetLabels() map[string]string {
}
type HealthCheck struct {
- Mode HealthCheck_Modes `protobuf:"varint,1,opt,name=mode,proto3,enum=proto1.HealthCheck_Modes" json:"mode,omitempty"`
+ Mode HealthCheck_Modes `protobuf:"varint,1,opt,name=mode,proto3,enum=proto.HealthCheck_Modes" json:"mode,omitempty"`
Port int32 `protobuf:"varint,2,opt,name=port,proto3" json:"port,omitempty"`
Interval int32 `protobuf:"varint,3,opt,name=interval,proto3" json:"interval,omitempty"`
Times int32 `protobuf:"varint,4,opt,name=times,proto3" json:"times,omitempty"`
@@ -449,7 +465,7 @@ type HealthCheck struct {
}
func (m *HealthCheck) Reset() { *m = HealthCheck{} }
-func (m *HealthCheck) String() string { return proto1.CompactTextString(m) }
+func (m *HealthCheck) String() string { return proto.CompactTextString(m) }
func (*HealthCheck) ProtoMessage() {}
func (*HealthCheck) Descriptor() ([]byte, []int) {
return fileDescriptor_9577b640f2aab197, []int{8}
@@ -501,7 +517,7 @@ type MappingEntry struct {
}
func (m *MappingEntry) Reset() { *m = MappingEntry{} }
-func (m *MappingEntry) String() string { return proto1.CompactTextString(m) }
+func (m *MappingEntry) String() string { return proto.CompactTextString(m) }
func (*MappingEntry) ProtoMessage() {}
func (*MappingEntry) Descriptor() ([]byte, []int) {
return fileDescriptor_9577b640f2aab197, []int{9}
@@ -550,80 +566,81 @@ func (m *MappingEntry) GetCurInstanceID() string {
}
func init() {
- proto1.RegisterEnum("proto.SyncService_Status", SyncService_Status_name, SyncService_Status_value)
- proto1.RegisterEnum("proto.SyncInstance_Status", SyncInstance_Status_name, SyncInstance_Status_value)
- proto1.RegisterEnum("proto.HealthCheck_Modes", HealthCheck_Modes_name, HealthCheck_Modes_value)
- proto1.RegisterType((*PullRequest)(nil), "proto.PullRequest")
- proto1.RegisterType((*IncrementPullRequest)(nil), "proto.IncrementPullRequest")
- proto1.RegisterType((*DeclareRequest)(nil), "proto.DeclareRequest")
- proto1.RegisterType((*DeclareResponse)(nil), "proto.DeclareResponse")
- proto1.RegisterType((*SyncData)(nil), "proto.SyncData")
- proto1.RegisterType((*SyncService)(nil), "proto.SyncService")
- proto1.RegisterType((*SyncInstance)(nil), "proto.SyncInstance")
- proto1.RegisterType((*Expansion)(nil), "proto.Expansion")
- proto1.RegisterMapType((map[string]string)(nil), "proto.Expansion.LabelsEntry")
- proto1.RegisterType((*HealthCheck)(nil), "proto.HealthCheck")
- proto1.RegisterType((*MappingEntry)(nil), "proto.MappingEntry")
-}
-
-func init() { proto1.RegisterFile("syncer.proto1", fileDescriptor_9577b640f2aab197) }
+ proto.RegisterEnum("proto.SyncService_Status", SyncService_Status_name, SyncService_Status_value)
+ proto.RegisterEnum("proto.SyncInstance_Status", SyncInstance_Status_name, SyncInstance_Status_value)
+ proto.RegisterEnum("proto.HealthCheck_Modes", HealthCheck_Modes_name, HealthCheck_Modes_value)
+ proto.RegisterType((*PullRequest)(nil), "proto.PullRequest")
+ proto.RegisterType((*IncrementPullRequest)(nil), "proto.IncrementPullRequest")
+ proto.RegisterType((*DeclareRequest)(nil), "proto.DeclareRequest")
+ proto.RegisterType((*DeclareResponse)(nil), "proto.DeclareResponse")
+ proto.RegisterType((*SyncData)(nil), "proto.SyncData")
+ proto.RegisterType((*SyncService)(nil), "proto.SyncService")
+ proto.RegisterType((*SyncInstance)(nil), "proto.SyncInstance")
+ proto.RegisterType((*Expansion)(nil), "proto.Expansion")
+ proto.RegisterMapType((map[string]string)(nil), "proto.Expansion.LabelsEntry")
+ proto.RegisterType((*HealthCheck)(nil), "proto.HealthCheck")
+ proto.RegisterType((*MappingEntry)(nil), "proto.MappingEntry")
+}
+
+func init() { proto.RegisterFile("syncer.proto", fileDescriptor_9577b640f2aab197) }
var fileDescriptor_9577b640f2aab197 = []byte{
- // 854 bytes of a gzipped FileDescriptorProto
+ // 868 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x55, 0xdd, 0x8e, 0xe3, 0x34,
- 0x14, 0x9e, 0xa4, 0xe9, 0xdf, 0x49, 0x77, 0xb6, 0x1c, 0x16, 0x14, 0xca, 0x08, 0x55, 0xd1, 0x0a,
- 0x46, 0x08, 0x2a, 0xb6, 0xec, 0x05, 0x0b, 0x17, 0x08, 0xb6, 0xc3, 0x6e, 0xc5, 0x4c, 0xa7, 0x72,
- 0xa7, 0x20, 0x21, 0x71, 0x91, 0x49, 0xad, 0x36, 0x4c, 0xea, 0x04, 0xdb, 0xa9, 0x98, 0x07, 0x82,
- 0xa7, 0x80, 0x87, 0xe1, 0x96, 0xa7, 0x40, 0x76, 0xdc, 0xd6, 0xe9, 0x54, 0x88, 0x8b, 0xbd, 0x8a,
- 0xfd, 0xf9, 0xcb, 0xf1, 0xf9, 0xf9, 0x7c, 0x0e, 0x74, 0xc4, 0x3d, 0x8b, 0x29, 0x1f, 0xe4, 0x3c,
- 0x93, 0x19, 0xd6, 0xf5, 0x27, 0xfc, 0x19, 0xfc, 0x69, 0x91, 0xa6, 0x84, 0xfe, 0x5a, 0x50, 0x21,
- 0xb1, 0x0f, 0xbe, 0xa0, 0x7c, 0x93, 0xc4, 0x74, 0x12, 0xad, 0x69, 0xe0, 0xf4, 0x9d, 0xf3, 0x36,
- 0xb1, 0x21, 0x0c, 0xa0, 0x99, 0xe5, 0x32, 0xc9, 0x98, 0x08, 0x5c, 0x7d, 0xba, 0xdd, 0x22, 0x82,
- 0x27, 0x93, 0x35, 0x0d, 0x6a, 0x1a, 0xd6, 0xeb, 0xf0, 0x63, 0x78, 0x32, 0x66, 0x31, 0xa7, 0x6b,
- 0xca, 0xa4, 0x7d, 0x0f, 0x82, 0x17, 0x2d, 0x16, 0xdc, 0x5c, 0xa0, 0xd7, 0xe1, 0x53, 0x38, 0x1d,
- 0xd1, 0x38, 0x8d, 0x38, 0xfd, 0x2f, 0xd6, 0x0b, 0x78, 0xbc, 0x63, 0x89, 0x3c, 0x63, 0x82, 0xe2,
- 0x87, 0x70, 0xaa, 0x42, 0x1b, 0x45, 0x32, 0xba, 0xa4, 0x6c, 0x29, 0x57, 0xfa, 0x87, 0x1a, 0x39,
- 0x40, 0xc3, 0x35, 0xb4, 0x66, 0x06, 0xc1, 0x01, 0xb4, 0x4c, 0x54, 0x22, 0x70, 0xfa, 0xb5, 0x73,
- 0x7f, 0x88, 0x65, 0x62, 0x06, 0x8a, 0x32, 0x2b, 0x8f, 0xc8, 0x8e, 0x83, 0xcf, 0xa0, 0x9d, 0x30,
- 0x21, 0x23, 0xa6, 0x7e, 0x70, 0xf5, 0x0f, 0x6f, 0x5b, 0x3f, 0x8c, 0xcd, 0x19, 0xd9, 0xb3, 0xc2,
- 0xbf, 0x5d, 0xf0, 0x2d, 0x63, 0x78, 0x06, 0x6d, 0x63, 0x6e, 0xbc, 0x30, 0x21, 0xed, 0x01, 0xec,
- 0x42, 0x2d, 0xca, 0x73, 0x93, 0x53, 0xb5, 0x54, 0xd1, 0xb3, 0x68, 0x9f, 0x4f, 0x66, 0xb2, 0xbf,
- 0xa1, 0x5c, 0x24, 0x19, 0x0b, 0xbc, 0x32, 0xfb, 0x66, 0x8b, 0xcf, 0xa0, 0x21, 0x64, 0x24, 0x0b,
- 0x11, 0xd4, 0xfb, 0xce, 0xf9, 0xe9, 0xf0, 0xbd, 0x87, 0xe1, 0x0c, 0x66, 0x9a, 0x40, 0x0c, 0x11,
- 0x9f, 0xc2, 0xa3, 0x45, 0xb6, 0x8e, 0x12, 0x36, 0xe5, 0xd9, 0x2f, 0x34, 0x96, 0x41, 0x43, 0x9b,
- 0xac, 0x82, 0x4a, 0x12, 0x94, 0x6d, 0x12, 0x9e, 0x31, 0x55, 0xc4, 0xa0, 0x59, 0x4a, 0xc2, 0x82,
- 0xf0, 0x03, 0x80, 0x3c, 0x2d, 0x96, 0x09, 0xd3, 0x9a, 0x69, 0x69, 0x82, 0x85, 0xe0, 0x67, 0x00,
- 0xf4, 0xb7, 0x3c, 0x62, 0x42, 0xab, 0xa6, 0xad, 0x93, 0xd7, 0x35, 0xee, 0x5d, 0x6c, 0x0f, 0x88,
- 0xc5, 0x09, 0x3f, 0x82, 0x46, 0xe9, 0x2b, 0xfa, 0xd0, 0x9c, 0x4f, 0xbe, 0x9f, 0x5c, 0xff, 0x38,
- 0xe9, 0x9e, 0x60, 0x03, 0xdc, 0xf9, 0xb4, 0xeb, 0x60, 0x0b, 0xbc, 0x91, 0x42, 0xdc, 0xf0, 0xf7,
- 0x1a, 0x74, 0xec, 0xfc, 0x2b, 0x5f, 0xb6, 0x15, 0xd8, 0x65, 0xd9, 0x42, 0xaa, 0x45, 0x70, 0x0f,
- 0x8b, 0x70, 0x06, 0x6d, 0xca, 0x16, 0x79, 0x96, 0x30, 0x29, 0x82, 0x5a, 0xbf, 0xa6, 0x4e, 0x77,
- 0x00, 0xf6, 0xa0, 0xb5, 0xca, 0x84, 0xd4, 0x51, 0x96, 0xd9, 0xdf, 0xed, 0x71, 0x78, 0x90, 0xfe,
- 0xde, 0x11, 0x71, 0x1c, 0xe6, 0xff, 0x39, 0xf8, 0x2b, 0x1a, 0xa5, 0x72, 0xf5, 0x72, 0x45, 0xe3,
- 0x3b, 0x9d, 0xfd, 0xbd, 0x0c, 0x5f, 0xef, 0x4f, 0x88, 0x4d, 0xb3, 0x25, 0xd0, 0xac, 0x4a, 0xe0,
- 0xcd, 0xd7, 0xe1, 0xd5, 0xff, 0xac, 0x03, 0x76, 0xa0, 0x35, 0xbb, 0xf9, 0x86, 0xdc, 0x8c, 0x27,
- 0xaf, 0xba, 0x35, 0xec, 0x42, 0xe7, 0x7a, 0x7e, 0x73, 0xfd, 0xdd, 0xec, 0x82, 0xfc, 0x30, 0x7e,
- 0x79, 0xd1, 0xf5, 0xc2, 0x3f, 0x1c, 0x68, 0xef, 0xae, 0x50, 0xca, 0xbe, 0x4b, 0xd8, 0xb6, 0x3c,
- 0x7a, 0x8d, 0x4f, 0xa0, 0x7e, 0x7b, 0x2f, 0x69, 0xd9, 0x55, 0x3a, 0xa4, 0xdc, 0xe0, 0x73, 0x68,
- 0xa4, 0xd1, 0x2d, 0x4d, 0xcb, 0x6a, 0xf8, 0xc3, 0xb3, 0x43, 0x77, 0x07, 0x97, 0xfa, 0xf8, 0x82,
- 0x49, 0x7e, 0x4f, 0x0c, 0xb7, 0xf7, 0x02, 0x7c, 0x0b, 0x56, 0x4f, 0xeb, 0x8e, 0xde, 0x9b, 0xdb,
- 0xd4, 0x52, 0x5d, 0xb6, 0x89, 0xd2, 0x82, 0x1a, 0x05, 0x94, 0x9b, 0x2f, 0xdd, 0x2f, 0x9c, 0xf0,
- 0x2f, 0x07, 0x7c, 0x2b, 0xf5, 0xf8, 0x09, 0x78, 0xeb, 0x6c, 0x51, 0x76, 0xc2, 0xd3, 0x61, 0xf0,
- 0xb0, 0x38, 0x83, 0xab, 0x6c, 0x41, 0x05, 0xd1, 0x2c, 0x15, 0x58, 0x9e, 0x71, 0xa9, 0xcd, 0xd6,
- 0x89, 0x5e, 0x2b, 0xd5, 0x24, 0x4c, 0x52, 0xbe, 0x89, 0x52, 0xfd, 0x94, 0xeb, 0x64, 0xb7, 0x57,
- 0x7e, 0xa8, 0x36, 0x29, 0xb4, 0x9c, 0xea, 0xa4, 0xdc, 0x28, 0x7f, 0x0b, 0x9e, 0x6a, 0x21, 0xb5,
- 0x89, 0x5a, 0x86, 0xe7, 0x50, 0xd7, 0xd7, 0x54, 0xcb, 0xd0, 0x02, 0x6f, 0x3a, 0x9f, 0xbd, 0x2e,
- 0x0b, 0x31, 0x9d, 0x5f, 0x5e, 0x76, 0xdd, 0xf0, 0x1f, 0x07, 0x3a, 0x57, 0x51, 0x9e, 0x27, 0x6c,
- 0x59, 0x06, 0xdf, 0x07, 0x3f, 0x4e, 0x0b, 0x21, 0x29, 0xb7, 0x3b, 0xba, 0x05, 0x3d, 0x6c, 0x03,
- 0xee, 0xb1, 0x36, 0x10, 0x42, 0x27, 0xe3, 0x4b, 0xd3, 0x49, 0xc6, 0x23, 0xd3, 0x95, 0x2a, 0x98,
- 0xb2, 0x94, 0xf1, 0xe5, 0x56, 0xee, 0xe3, 0x91, 0x79, 0x25, 0x55, 0x50, 0x59, 0x8a, 0x0b, 0xbe,
- 0xb7, 0x54, 0xc6, 0x59, 0xc1, 0x94, 0xa5, 0xb8, 0xe0, 0x96, 0x25, 0xd3, 0x9a, 0x2a, 0xe0, 0xf0,
- 0x4f, 0x07, 0x3c, 0xf5, 0xc0, 0xf0, 0x53, 0xf0, 0xd4, 0x74, 0xc1, 0xed, 0xe3, 0xb1, 0x46, 0x4d,
- 0xef, 0xb1, 0xf5, 0x12, 0x55, 0xeb, 0x0f, 0x4f, 0x70, 0x04, 0x6f, 0x99, 0x19, 0xb2, 0x9f, 0x0e,
- 0xf8, 0x8e, 0xe1, 0x55, 0x67, 0x50, 0xef, 0xdd, 0x43, 0xb8, 0x1c, 0x3a, 0xe1, 0x09, 0x7e, 0x0d,
- 0x8f, 0x2a, 0xb3, 0x0d, 0xdf, 0x37, 0xd4, 0x63, 0x13, 0xef, 0x88, 0x1b, 0xdf, 0xb6, 0x7f, 0x6a,
- 0x0e, 0xbe, 0xd2, 0xe8, 0x6d, 0x43, 0x7f, 0x3e, 0xff, 0x37, 0x00, 0x00, 0xff, 0xff, 0xed, 0xcc,
- 0x45, 0x4e, 0xa4, 0x07, 0x00, 0x00,
+ 0x14, 0x9e, 0xa4, 0xff, 0x27, 0xdd, 0xd9, 0x62, 0x96, 0x55, 0x28, 0x23, 0x54, 0x45, 0x2b, 0x98,
+ 0x0b, 0xa8, 0xd8, 0xb2, 0x17, 0x2c, 0x5c, 0x20, 0x76, 0x3b, 0xec, 0x56, 0xcc, 0x76, 0x2a, 0x77,
+ 0x0a, 0x12, 0x77, 0x99, 0xd4, 0x6a, 0xc3, 0xa4, 0x76, 0xc6, 0x76, 0x2a, 0xe6, 0x81, 0xe0, 0x29,
+ 0xe0, 0x61, 0xb8, 0xe5, 0x29, 0x90, 0x7f, 0xda, 0x38, 0x9d, 0x0a, 0x71, 0xc1, 0x55, 0x7c, 0x3e,
+ 0x7f, 0x39, 0xf6, 0x39, 0xe7, 0xf3, 0x39, 0xd0, 0x15, 0xf7, 0x34, 0x21, 0x7c, 0x98, 0x73, 0x26,
+ 0x19, 0x6a, 0xe8, 0x4f, 0x74, 0x07, 0xc1, 0xac, 0xc8, 0x32, 0x4c, 0xee, 0x0a, 0x22, 0x24, 0x1a,
+ 0x40, 0x20, 0x08, 0xdf, 0xa6, 0x09, 0x99, 0xc6, 0x1b, 0x12, 0x7a, 0x03, 0xef, 0xbc, 0x83, 0x5d,
+ 0x08, 0x85, 0xd0, 0x62, 0xb9, 0x4c, 0x19, 0x15, 0xa1, 0xaf, 0x77, 0x77, 0x26, 0x42, 0x50, 0x97,
+ 0xe9, 0x86, 0x84, 0x35, 0x0d, 0xeb, 0xb5, 0xc2, 0xe2, 0xe5, 0x92, 0x87, 0x75, 0x83, 0xa9, 0x75,
+ 0xf4, 0x0a, 0x9e, 0x4c, 0x68, 0xc2, 0xc9, 0x86, 0x50, 0xe9, 0x9e, 0xbd, 0xe3, 0x7a, 0x25, 0x17,
+ 0x3d, 0x85, 0x66, 0x46, 0xe8, 0x4a, 0xae, 0xf5, 0x61, 0x35, 0x6c, 0xad, 0xe8, 0x19, 0x9c, 0x8e,
+ 0x49, 0x92, 0xc5, 0x9c, 0xfc, 0xcb, 0xdf, 0xd1, 0x4b, 0x78, 0xbc, 0x67, 0x89, 0x9c, 0x51, 0x41,
+ 0xd0, 0x27, 0x70, 0xaa, 0xd2, 0x30, 0x8e, 0x65, 0x7c, 0x69, 0x1c, 0x7b, 0xda, 0xf1, 0x01, 0x1a,
+ 0x6d, 0xa0, 0x3d, 0xb7, 0x08, 0x1a, 0x42, 0xdb, 0x66, 0x40, 0x84, 0xde, 0xa0, 0x76, 0x1e, 0x8c,
+ 0x90, 0x49, 0xe2, 0x50, 0x51, 0xe6, 0x66, 0x0b, 0xef, 0x39, 0xe8, 0x39, 0x74, 0x52, 0x2a, 0x64,
+ 0x4c, 0xd5, 0x0f, 0xbe, 0xfe, 0xe1, 0x7d, 0xe7, 0x87, 0x89, 0xdd, 0xc3, 0x25, 0x2b, 0xfa, 0xcb,
+ 0x87, 0xc0, 0x71, 0x86, 0xce, 0xa0, 0x63, 0xdd, 0x4d, 0x96, 0x36, 0xa4, 0x12, 0x40, 0x3d, 0xa8,
+ 0xc5, 0x79, 0x6e, 0xf3, 0xaf, 0x96, 0x2a, 0x7a, 0x1a, 0x97, 0xb9, 0xa7, 0xb6, 0x52, 0x5b, 0xc2,
+ 0x45, 0xca, 0xa8, 0x4d, 0xff, 0xce, 0x44, 0xcf, 0xa1, 0x29, 0x64, 0x2c, 0x0b, 0x11, 0x36, 0x06,
+ 0xde, 0xf9, 0xe9, 0xe8, 0xc3, 0x87, 0xe1, 0x0c, 0xe7, 0x9a, 0x80, 0x2d, 0x11, 0x3d, 0x83, 0x47,
+ 0x4b, 0xb6, 0x89, 0x53, 0x3a, 0xe3, 0xec, 0x17, 0x92, 0xc8, 0xb0, 0xa9, 0x5d, 0x56, 0x41, 0x25,
+ 0x1f, 0x42, 0xb7, 0x29, 0x67, 0x54, 0x15, 0x37, 0x6c, 0x19, 0xf9, 0x38, 0x10, 0xfa, 0x18, 0x20,
+ 0xcf, 0x8a, 0x55, 0x4a, 0xb5, 0xbe, 0xda, 0x9a, 0xe0, 0x20, 0xe8, 0x0b, 0x00, 0xf2, 0x6b, 0x1e,
+ 0x53, 0xa1, 0x15, 0xd6, 0xd1, 0xc9, 0xeb, 0xd9, 0xeb, 0x5d, 0xec, 0x36, 0xb0, 0xc3, 0x89, 0x3e,
+ 0x85, 0xa6, 0xb9, 0x2b, 0x0a, 0xa0, 0xb5, 0x98, 0xfe, 0x30, 0xbd, 0xfa, 0x69, 0xda, 0x3b, 0x41,
+ 0x4d, 0xf0, 0x17, 0xb3, 0x9e, 0x87, 0xda, 0x50, 0x1f, 0x2b, 0xc4, 0x8f, 0x7e, 0xab, 0x41, 0xd7,
+ 0xcd, 0xbf, 0xba, 0xcb, 0xae, 0x02, 0xfb, 0x2c, 0x3b, 0x48, 0xb5, 0x08, 0xfe, 0x61, 0x11, 0xce,
+ 0xa0, 0x43, 0xe8, 0x32, 0x67, 0x29, 0x95, 0x22, 0xac, 0x0d, 0x6a, 0x6a, 0x77, 0x0f, 0xa0, 0x3e,
+ 0xb4, 0xd7, 0x4c, 0x48, 0x1d, 0xa5, 0xc9, 0xfe, 0xde, 0x46, 0xa3, 0x83, 0xf4, 0xf7, 0x8f, 0x88,
+ 0xe3, 0x30, 0xff, 0x2f, 0x20, 0x58, 0x93, 0x38, 0x93, 0xeb, 0xd7, 0x6b, 0x92, 0xdc, 0xea, 0xec,
+ 0x97, 0x32, 0x7c, 0x5b, 0xee, 0x60, 0x97, 0xe6, 0x4a, 0xa0, 0x55, 0x95, 0xc0, 0xff, 0x5f, 0x87,
+ 0x37, 0xff, 0xb1, 0x0e, 0xa8, 0x0b, 0xed, 0xf9, 0xf5, 0x77, 0xf8, 0x7a, 0x32, 0x7d, 0xd3, 0xab,
+ 0xa1, 0x1e, 0x74, 0xaf, 0x16, 0xd7, 0x57, 0xdf, 0xcf, 0x2f, 0xf0, 0x8f, 0x93, 0xd7, 0x17, 0xbd,
+ 0x7a, 0xf4, 0xbb, 0x07, 0x9d, 0xfd, 0x11, 0x4a, 0xd9, 0xb7, 0x29, 0xdd, 0x95, 0x47, 0xaf, 0xd1,
+ 0x13, 0x68, 0xdc, 0xdc, 0x4b, 0x62, 0x3a, 0x50, 0x17, 0x1b, 0x03, 0xbd, 0x80, 0x66, 0x16, 0xdf,
+ 0x90, 0xcc, 0x54, 0x23, 0x18, 0x9d, 0x1d, 0x5e, 0x77, 0x78, 0xa9, 0xb7, 0x2f, 0xa8, 0xe4, 0xf7,
+ 0xd8, 0x72, 0xfb, 0x2f, 0x21, 0x70, 0x60, 0xf5, 0xb4, 0x6e, 0xc9, 0xbd, 0x3d, 0x4d, 0x2d, 0xd5,
+ 0x61, 0xdb, 0x38, 0x2b, 0x88, 0x55, 0x80, 0x31, 0xbe, 0xf6, 0xbf, 0xf2, 0xa2, 0x3f, 0x3d, 0x08,
+ 0x9c, 0xd4, 0xa3, 0xcf, 0xa0, 0xbe, 0x61, 0x4b, 0xd3, 0x35, 0x4f, 0x47, 0xe1, 0xc3, 0xe2, 0x0c,
+ 0xdf, 0xb1, 0x25, 0x11, 0x58, 0xb3, 0x54, 0x60, 0x39, 0xe3, 0x52, 0xbb, 0x6d, 0x60, 0xbd, 0x56,
+ 0xaa, 0x49, 0xa9, 0x24, 0x7c, 0x1b, 0x67, 0xfa, 0x29, 0x37, 0xf0, 0xde, 0x56, 0xf7, 0x50, 0x2d,
+ 0x55, 0x68, 0x39, 0x35, 0xb0, 0x31, 0xd4, 0x7d, 0x0b, 0x9e, 0x69, 0x21, 0x75, 0xb0, 0x5a, 0x46,
+ 0xe7, 0xd0, 0xd0, 0xc7, 0x54, 0xcb, 0xd0, 0x86, 0xfa, 0x6c, 0x31, 0x7f, 0x6b, 0x0a, 0x31, 0x5b,
+ 0x5c, 0x5e, 0xf6, 0xfc, 0xe8, 0x6f, 0x0f, 0xba, 0xef, 0xe2, 0x3c, 0x4f, 0xe9, 0xca, 0x04, 0x3f,
+ 0x80, 0x20, 0xc9, 0x0a, 0x21, 0x09, 0x77, 0xbb, 0xbf, 0x03, 0x3d, 0x6c, 0x03, 0xfe, 0xb1, 0x36,
+ 0x10, 0x41, 0x97, 0xf1, 0x95, 0xed, 0x24, 0x93, 0xb1, 0xed, 0x4a, 0x15, 0x4c, 0x79, 0x62, 0x7c,
+ 0xb5, 0x93, 0xfb, 0x64, 0x6c, 0x5f, 0x49, 0x15, 0x54, 0x9e, 0x92, 0x82, 0x97, 0x9e, 0x4c, 0x9c,
+ 0x15, 0x4c, 0x79, 0x4a, 0x0a, 0xee, 0x78, 0xb2, 0xad, 0xa9, 0x02, 0x8e, 0xfe, 0xf0, 0xa0, 0xae,
+ 0x1e, 0x18, 0xfa, 0x1c, 0xea, 0x6a, 0xea, 0xa0, 0xdd, 0xe3, 0x71, 0x46, 0x50, 0xff, 0xb1, 0xf3,
+ 0x12, 0x55, 0xeb, 0x8f, 0x4e, 0xd0, 0x18, 0xde, 0xb3, 0x33, 0xa4, 0x9c, 0x0e, 0xe8, 0x03, 0xcb,
+ 0xab, 0xce, 0xa0, 0xfe, 0xd3, 0x43, 0xd8, 0x0c, 0x9d, 0xe8, 0x04, 0x7d, 0x0b, 0x8f, 0x2a, 0x33,
+ 0x0f, 0x7d, 0x64, 0xa9, 0xc7, 0x26, 0xe1, 0x91, 0x6b, 0xbc, 0xea, 0xfc, 0xdc, 0x1a, 0x7e, 0xa3,
+ 0xd1, 0x9b, 0xa6, 0xfe, 0x7c, 0xf9, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x37, 0x82, 0xde, 0xa9,
+ 0xd0, 0x07, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@@ -763,56 +780,3 @@ var _Sync_serviceDesc = grpc.ServiceDesc{
Streams: []grpc.StreamDesc{},
Metadata: "syncer.proto",
}
-
-func init() { proto1.RegisterFile("syncer.proto", fileDescriptor0) }
-
-var fileDescriptor0 = []byte{
- // 751 bytes of a gzipped FileDescriptorProto
- 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x55, 0xcd, 0x6e, 0xdb, 0x48,
- 0x0c, 0x8e, 0x64, 0xf9, 0x8f, 0xf2, 0x66, 0x85, 0xd9, 0x3d, 0x68, 0x8d, 0x60, 0x61, 0x08, 0x0b,
- 0xac, 0x0f, 0xbb, 0x46, 0xe3, 0xa6, 0x40, 0xdb, 0x5b, 0x11, 0xbb, 0x89, 0xd1, 0xc4, 0x31, 0xc6,
- 0x71, 0x7b, 0xea, 0x41, 0x91, 0x07, 0xb6, 0x1a, 0x79, 0xa4, 0x6a, 0x46, 0x46, 0xfd, 0x40, 0xed,
- 0x5b, 0xf4, 0x61, 0x7a, 0xed, 0x53, 0x14, 0xf3, 0x63, 0x7b, 0xe4, 0xe4, 0xd0, 0x43, 0x4f, 0x26,
- 0x3f, 0x52, 0x1c, 0x92, 0x1f, 0x49, 0x43, 0x8b, 0x6d, 0x68, 0x44, 0xf2, 0x5e, 0x96, 0xa7, 0x3c,
- 0x45, 0x55, 0xf9, 0x13, 0xbc, 0x07, 0x77, 0x52, 0x24, 0x09, 0x26, 0x1f, 0x0b, 0xc2, 0x38, 0xea,
- 0x80, 0xcb, 0x48, 0xbe, 0x8e, 0x23, 0x32, 0x0e, 0x57, 0xc4, 0xb7, 0x3a, 0x56, 0xb7, 0x89, 0x4d,
- 0x08, 0xf9, 0x50, 0x4f, 0x33, 0x1e, 0xa7, 0x94, 0xf9, 0xb6, 0xb4, 0x6e, 0x55, 0x84, 0xc0, 0xe1,
- 0xf1, 0x8a, 0xf8, 0x15, 0x09, 0x4b, 0x39, 0x58, 0x41, 0x63, 0xba, 0xa1, 0xd1, 0x20, 0xe4, 0x21,
- 0xea, 0x41, 0x43, 0x07, 0x62, 0xbe, 0xd5, 0xa9, 0x74, 0xdd, 0x3e, 0x52, 0xb9, 0xf4, 0x84, 0xcb,
- 0x54, 0x99, 0xf0, 0xce, 0x07, 0x9d, 0x42, 0x33, 0xa6, 0x8c, 0x87, 0x54, 0x7c, 0x60, 0xcb, 0x0f,
- 0xfe, 0x30, 0x3e, 0x18, 0x69, 0x1b, 0xde, 0x7b, 0x05, 0xdf, 0x6c, 0x70, 0x8d, 0x60, 0xe8, 0x04,
- 0x9a, 0x3a, 0xdc, 0x68, 0xae, 0x8b, 0xd9, 0x03, 0xc8, 0x83, 0x4a, 0x98, 0x65, 0xba, 0x0c, 0x21,
- 0x8a, 0x12, 0x68, 0xb8, 0x2f, 0x81, 0xea, 0x82, 0xd7, 0x24, 0x67, 0x71, 0x4a, 0x7d, 0x47, 0x15,
- 0xac, 0x55, 0x74, 0x0a, 0x35, 0xc6, 0x43, 0x5e, 0x30, 0xbf, 0xda, 0xb1, 0xba, 0xc7, 0xfd, 0xbf,
- 0x1e, 0x96, 0xd3, 0x9b, 0x4a, 0x07, 0xac, 0x1d, 0xd1, 0x3f, 0xf0, 0xdb, 0x3c, 0x5d, 0x85, 0x31,
- 0x9d, 0xe4, 0xe9, 0x07, 0x12, 0x71, 0xbf, 0x26, 0x43, 0x96, 0x41, 0xc1, 0x02, 0xa1, 0xeb, 0x38,
- 0x4f, 0xe9, 0x8a, 0x50, 0xee, 0xd7, 0x15, 0x0b, 0x06, 0x84, 0xfe, 0x06, 0xc8, 0x92, 0x62, 0x11,
- 0x53, 0x49, 0x53, 0x43, 0x3a, 0x18, 0x08, 0x7a, 0x02, 0x40, 0x3e, 0x65, 0x21, 0x65, 0x92, 0xa8,
- 0xa6, 0x6c, 0x9e, 0xa7, 0xd3, 0x1b, 0x6e, 0x0d, 0xd8, 0xf0, 0x09, 0xfe, 0x85, 0x9a, 0xca, 0x15,
- 0xb9, 0x50, 0x9f, 0x8d, 0xdf, 0x8c, 0x6f, 0xde, 0x8d, 0xbd, 0x23, 0x54, 0x03, 0x7b, 0x36, 0xf1,
- 0x2c, 0xd4, 0x00, 0x67, 0x20, 0x10, 0x3b, 0xf8, 0x5c, 0x81, 0x96, 0xd9, 0x7f, 0x91, 0xcb, 0x96,
- 0x81, 0x5d, 0x97, 0x0d, 0xa4, 0x4c, 0x82, 0x7d, 0x48, 0xc2, 0x09, 0x34, 0x09, 0x9d, 0x67, 0x69,
- 0x4c, 0x39, 0xf3, 0x2b, 0x9d, 0x8a, 0xb0, 0xee, 0x00, 0xd4, 0x86, 0xc6, 0x32, 0x65, 0x5c, 0x56,
- 0xa9, 0xba, 0xbf, 0xd3, 0x51, 0xff, 0xa0, 0xfd, 0xed, 0x47, 0x86, 0xe3, 0xb0, 0xff, 0x67, 0xe0,
- 0x2e, 0x49, 0x98, 0xf0, 0xe5, 0xf9, 0x92, 0x44, 0xf7, 0xb2, 0xfb, 0xfb, 0x31, 0xbc, 0xdc, 0x5b,
- 0xb0, 0xe9, 0x66, 0x8e, 0x40, 0xbd, 0x3c, 0x02, 0xbf, 0x9e, 0x87, 0x8b, 0x9f, 0xe4, 0x01, 0xb5,
- 0xa0, 0x31, 0xbd, 0x7d, 0x85, 0x6f, 0x47, 0xe3, 0x0b, 0xaf, 0x82, 0x3c, 0x68, 0xdd, 0xcc, 0x6e,
- 0x6f, 0x5e, 0x4f, 0x87, 0xf8, 0xed, 0xe8, 0x7c, 0xe8, 0x39, 0xc1, 0x17, 0x0b, 0x9a, 0xbb, 0x27,
- 0xc4, 0x64, 0xdf, 0xc7, 0x74, 0x4b, 0x8f, 0x94, 0xd1, 0x9f, 0x50, 0xbd, 0xdb, 0x70, 0xa2, 0x16,
- 0xb9, 0x85, 0x95, 0x82, 0xce, 0xa0, 0x96, 0x84, 0x77, 0x24, 0x51, 0x6c, 0xb8, 0xfd, 0x93, 0xc3,
- 0x74, 0x7b, 0x57, 0xd2, 0x3c, 0xa4, 0x3c, 0xdf, 0x60, 0xed, 0xdb, 0x7e, 0x01, 0xae, 0x01, 0x8b,
- 0xd5, 0xba, 0x27, 0x1b, 0xfd, 0x9a, 0x10, 0xc5, 0x63, 0xeb, 0x30, 0x29, 0x88, 0x9e, 0x00, 0xa5,
- 0xbc, 0xb4, 0x9f, 0x5b, 0xc1, 0x57, 0x0b, 0x5c, 0xa3, 0xf5, 0xe8, 0x3f, 0x70, 0x56, 0xe9, 0x5c,
- 0x1d, 0x9f, 0xe3, 0xbe, 0xff, 0x90, 0x9c, 0xde, 0x75, 0x3a, 0x27, 0x0c, 0x4b, 0x2f, 0x51, 0x58,
- 0x96, 0xe6, 0x5c, 0x86, 0xad, 0x62, 0x29, 0x8b, 0xa9, 0x89, 0x29, 0x27, 0xf9, 0x3a, 0x4c, 0xe4,
- 0x2a, 0x57, 0xf1, 0x4e, 0x17, 0x79, 0x88, 0xcb, 0xc4, 0xe4, 0x38, 0x55, 0xb1, 0x52, 0x44, 0xbe,
- 0x45, 0x9e, 0xc8, 0x41, 0x6a, 0x62, 0x21, 0x06, 0x5d, 0xa8, 0xca, 0x67, 0xca, 0x34, 0x34, 0xc0,
- 0x99, 0xcc, 0xa6, 0x97, 0x8a, 0x88, 0xc9, 0xec, 0xea, 0xca, 0xb3, 0x83, 0xef, 0x16, 0xb4, 0xae,
- 0xc3, 0x2c, 0x8b, 0xe9, 0x42, 0x15, 0xdf, 0x01, 0x37, 0x4a, 0x0a, 0xc6, 0x49, 0x6e, 0x1e, 0x51,
- 0x03, 0x7a, 0x78, 0x06, 0xec, 0xc7, 0xce, 0x40, 0x00, 0xad, 0x34, 0x5f, 0xe8, 0x4b, 0x32, 0x1a,
- 0xe8, 0xab, 0x54, 0xc2, 0x44, 0xa4, 0x34, 0x5f, 0x6c, 0xc7, 0x7d, 0x34, 0xd0, 0x5b, 0x52, 0x06,
- 0x45, 0xa4, 0xa8, 0xc8, 0xf7, 0x91, 0x54, 0x9d, 0x25, 0x4c, 0x44, 0x8a, 0x8a, 0xdc, 0x88, 0xa4,
- 0x4f, 0x53, 0x09, 0xec, 0x3f, 0x03, 0x47, 0xec, 0x17, 0xfa, 0x1f, 0x1c, 0xf1, 0xbf, 0x81, 0xb6,
- 0xbb, 0x63, 0xfc, 0x89, 0xb4, 0x7f, 0x37, 0x16, 0x51, 0x5c, 0xfe, 0xe0, 0xe8, 0xae, 0x26, 0x91,
- 0xa7, 0x3f, 0x02, 0x00, 0x00, 0xff, 0xff, 0x75, 0xe0, 0xda, 0xd0, 0x84, 0x06, 0x00, 0x00,
-}
diff --git a/syncer/proto/syncer.proto b/syncer/proto/syncer.proto
index ea55e55..66a563f 100644
--- a/syncer/proto/syncer.proto
+++ b/syncer/proto/syncer.proto
@@ -6,10 +6,12 @@ message PullRequest {
string serviceName = 1;
string options = 2;
string time = 3;
+ string addr = 4;
}
message IncrementPullRequest {
string addr = 1;
+ int64 length = 2;
}
message DeclareRequest {
diff --git a/syncer/server/handler.go b/syncer/server/handler.go
index 6d0e744..0716db4 100644
--- a/syncer/server/handler.go
+++ b/syncer/server/handler.go
@@ -21,10 +21,9 @@ import (
"context"
"crypto/tls"
"fmt"
- "math"
"strconv"
- "time"
+ "github.com/apache/servicecomb-service-center/pkg/dump"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/tlsutil"
"github.com/apache/servicecomb-service-center/pkg/util"
@@ -37,6 +36,7 @@ const (
EventDiscovered = "discovered"
EventIncrementPulled = "incrementPulled"
EventNotifyFullPulled = "notifyFullPulled"
+ ChannelBufferSize = 1000
)
// tickHandler Timed task handler
@@ -70,69 +70,16 @@ func (s *Server) tickHandler() {
}
}
-func (s *Server) DataRemoveTickHandler() chan bool {
- ticker := time.NewTicker(time.Second * 30)
- stopChan := make(chan bool)
- go func(trick *time.Ticker) {
- //defer ticker.Stop()
- for {
- select {
- case <-ticker.C:
- s.eventQueueDataRemoveTickHandler()
- log.Info(fmt.Sprintf("size of records map = %d, size of events slice = %d", len(s.revisionMap), len(s.eventQueue)))
- case stop := <-stopChan:
- if stop {
- log.Info("data remove ticker stop")
- return
- }
- case <-context.Background().Done():
- return
- }
- }
- }(ticker)
- return stopChan
-}
-
-func (s *Server) eventQueueDataRemoveTickHandler() {
- if len(s.revisionMap) == 0 || len(s.eventQueue) == 0 {
- log.Info("RevisionMap or EventQueue is empty")
- return
- }
-
- log.Info(fmt.Sprintf("length of map : %d", len(s.revisionMap)))
- var minRevision int64 = math.MaxInt64
- for _, value := range s.revisionMap {
- var tempRevision = value.revision
- if tempRevision < minRevision {
- minRevision = tempRevision
- }
- }
-
- log.Info(fmt.Sprintf("revision of item will remove : %d", minRevision))
-
- j := 0
- for _, value := range s.eventQueue {
- var tempRevision = value.Revision
- if tempRevision == minRevision {
- break
- }
- j++
- }
- s.eventQueue = s.eventQueue[j:]
- log.Info(fmt.Sprintf("size of event queue : %d", len(s.eventQueue)))
- if len(s.eventQueue) > 0 {
- log.Info(fmt.Sprintf("revision of first element in event queue : %d", s.eventQueue[0].Revision))
- }
-}
-
// Pull returns sync data of servicecenter
-func (s *Server) Pull(context.Context, *pb.PullRequest) (*pb.SyncData, error) {
+func (s *Server) Pull(ctx context.Context, req *pb.PullRequest) (*pb.SyncData, error) {
+ if _, ok := s.channelMap[req.GetAddr()]; !ok {
+ s.channelMap[req.GetAddr()] = make(chan *dump.WatchInstanceChangedEvent, ChannelBufferSize)
+ }
return s.servicecenter.Discovery(), nil
}
func (s *Server) IncrementPull(ctx context.Context, req *pb.IncrementPullRequest) (*pb.SyncData, error) {
- incrementQueue := s.GetIncrementQueue(req.GetAddr())
- s.updateRevisionMap(req.GetAddr(), incrementQueue)
+ incrementQueue := s.GetIncrementQueue(req)
return s.GetIncrementData(ctx, incrementQueue), nil
}
@@ -179,7 +126,7 @@ func (s *Server) userEvent(data ...[]byte) (success bool) {
}
cli := client.NewSyncClient(endpoint, tlsConfig)
- syncData, err := cli.Pull(context.Background())
+ syncData, err := cli.Pull(context.Background(), s.conf.Listener.RPCAddr)
if err != nil {
log.Errorf(err, "Pull other serf instances failed, node name is '%s'", members[0].Name)
return
@@ -234,7 +181,8 @@ func (s *Server) incrementUserEvent(data ...[]byte) (success bool) {
syncDataLength := declareResponse.SyncDataLength
if syncDataLength != 0 {
- syncData, err := cli.IncrementPull(context.Background(), s.conf.Listener.RPCAddr)
+ syncData, err := cli.IncrementPull(
+ context.Background(), &pb.IncrementPullRequest{Addr: s.conf.Listener.RPCAddr, Length: syncDataLength})
if err != nil {
log.Error(fmt.Sprintf("IncrementPull other serf instances failed, node name is '%s'", members[0].Name), err)
return
diff --git a/syncer/server/handler_test.go b/syncer/server/handler_test.go
index 72e7d24..fd0ee12 100644
--- a/syncer/server/handler_test.go
+++ b/syncer/server/handler_test.go
@@ -3,12 +3,8 @@ package server
import (
"context"
"errors"
- "fmt"
- "log"
"strconv"
- "sync"
"testing"
- "time"
"github.com/apache/servicecomb-service-center/pkg/dump"
"github.com/apache/servicecomb-service-center/syncer/config"
@@ -20,57 +16,14 @@ import (
var s Server
-func TestServer_DataRemoveTickHandler(t *testing.T) {
- log.Print("start")
- var actions = []string{"CREATE", "UPDATE", "DELETE"}
- var s Server
-
- var recoders = make(map[string]record, 9)
- var events = make([]*dump.WatchInstanceChangedEvent, 10)
- s.revisionMap = recoders
- s.eventQueue = events
-
- for i := 2; i < 10; i++ {
- var recoder = new(record)
- recoder.revision = int64(i)
- s.revisionMap[strconv.FormatInt(int64(i), 10)] = *recoder
- }
- log.Printf("size of records map = %d", len(s.revisionMap))
-
- for i := 0; i < 10; i++ {
- var event = new(dump.WatchInstanceChangedEvent)
- event.Revision = int64(i)
-
- event.Action = actions[i%3]
-
- s.eventQueue[i] = event
- }
-
- removeMapElement(s.revisionMap, 2)
- t.Run("start update queue stop after 15 second", func(t *testing.T) {
- ch := s.DataRemoveTickHandler()
- time.Sleep(15 * time.Second)
- ch <- true
- close(ch)
- })
-
-}
-
func TestServer_IncrementPull(t *testing.T) {
confCreate()
- s.revisionMap, s.eventQueue = getMapAndQueue(9, 10)
- t.Run("increment when address is new", func(t *testing.T) {
- iPReq := pb.IncrementPullRequest{
- Addr: "171.0.0.1",
- }
- syncData, err := s.IncrementPull(context.Background(), &iPReq)
- assert.NoError(t, err, "no error when DeclareDataLength")
- assert.NotEmpty(t, syncData, "increase succeed")
- })
+ s.channelMap = setChannel()
t.Run("increment when address exist", func(t *testing.T) {
iPReq := pb.IncrementPullRequest{
- Addr: "1",
+ Addr: "1",
+ Length: 1,
}
syncData, err := s.IncrementPull(context.Background(), &iPReq)
assert.NoError(t, err, "no error when DeclareDataLength")
@@ -79,17 +32,7 @@ func TestServer_IncrementPull(t *testing.T) {
}
func TestServer_DeclareDataLength(t *testing.T) {
- s.revisionMap, s.eventQueue = getMapAndQueue(9, 10)
- t.Run("run with new address", func(t *testing.T) {
- dReq := pb.DeclareRequest{
- Addr: "http://127.0.0.1",
- }
- declareResp, err := s.DeclareDataLength(context.Background(), &dReq)
-
- assert.NoError(t, err, "error when DeclareDataLength")
- assert.Empty(t, err, "declareResp.SyncDataLength is empty")
- assert.NotZero(t, declareResp.SyncDataLength, "declareResp.SyncDataLength is empty")
- })
+ s.channelMap = setChannel()
t.Run("when address exist", func(t *testing.T) {
dReq := pb.DeclareRequest{
Addr: "3",
@@ -118,33 +61,24 @@ func TestService_incrementUserEvent(t *testing.T) {
})
}
-func getMapAndQueue(mapSize int, queueSize int) (map[string]record, []*dump.WatchInstanceChangedEvent) {
- var actions = []string{"CREATE", "UPDATE", "DELETE"}
-
- var recoders = make(map[string]record, mapSize)
- var events = make([]*dump.WatchInstanceChangedEvent, queueSize)
-
- for i := 2; i < queueSize; i++ {
- var recoder = new(record)
- recoder.revision = int64(i)
-
- recoder.action = actions[i%3]
-
- recoders[strconv.FormatInt(int64(i), 10)] = *recoder
- }
- log.Printf("size of records map = %d", len(s.revisionMap))
-
- for i := 0; i < queueSize; i++ {
- var event = new(dump.WatchInstanceChangedEvent)
- var event1 = instanceAndServiceCreate(i)
- event.Revision = int64(i)
- event.Action = actions[i%3]
- event.Service = event1.Service
- event.Instance = event1.Instance
-
- events[i] = event
+func setChannel() map[string]chan *dump.WatchInstanceChangedEvent {
+ var channelMap = make(map[string]chan *dump.WatchInstanceChangedEvent)
+ var ch1 = make(chan *dump.WatchInstanceChangedEvent, 1000)
+ var ch2 = make(chan *dump.WatchInstanceChangedEvent, 1000)
+ var ch3 = make(chan *dump.WatchInstanceChangedEvent, 1000)
+ channelMap["1"] = ch1
+ channelMap["2"] = ch2
+ channelMap["3"] = ch3
+
+ var event1 = instanceAndServiceCreate(1)
+
+ for _, ch := range channelMap {
+ select {
+ case ch <- event1:
+ default:
+ }
}
- return recoders, events
+ return channelMap
}
func confCreate() {
@@ -262,7 +196,6 @@ func instanceAndServiceCreate(i int) *dump.WatchInstanceChangedEvent {
is.Value = iv
is.Rev = int64(i)
- event.Revision = int64(i)
event.Instance = is
event.Service = ss
@@ -277,20 +210,3 @@ func defaultServer() *serf.Server {
serf.WithBindPort(35151),
)
}
-
-func removeMapElement(events map[string]record, start int64) {
- var mutex sync.Mutex
- testTicker := time.NewTicker(time.Second * 1)
- go func(tick *time.Ticker) {
- for k := start; k < 10; k++ {
- <-testTicker.C
- mutex.Lock()
- delete(events, strconv.FormatInt(int64(k), 10))
- mutex.Unlock()
- fmt.Println("remove element in map")
- }
- }(testTicker)
- if len(events) == 0 {
- defer testTicker.Stop()
- }
-}
diff --git a/syncer/server/server.go b/syncer/server/server.go
index 8d5ccbd..7212dee 100644
--- a/syncer/server/server.go
+++ b/syncer/server/server.go
@@ -20,7 +20,6 @@ package server
import (
"context"
"errors"
- "fmt"
"strconv"
"sync"
"syscall"
@@ -86,38 +85,26 @@ type Server struct {
// Wraps the grpc server
grpc *grpc.Server
- revisionMap map[string]record
-
- eventQueue []*dump.WatchInstanceChangedEvent
-
- mapLock sync.RWMutex
-
- queueLock sync.RWMutex
-
mux sync.RWMutex
triggered bool
+ channelMap map[string]chan *dump.WatchInstanceChangedEvent
+
// The channel will be closed when receiving a system interrupt signal
stopCh chan struct{}
}
-type record struct {
- revision int64
- action string
-}
-
// NewServer new server with Config
func NewServer(conf *config.Config) *Server {
ctx, cancel := context.WithCancel(context.Background())
return &Server{
- ctx: ctx,
- cancel: cancel,
- conf: conf,
- stopCh: make(chan struct{}),
- revisionMap: make(map[string]record),
- eventQueue: make([]*dump.WatchInstanceChangedEvent, 0),
- triggered: true,
+ ctx: ctx,
+ cancel: cancel,
+ conf: conf,
+ stopCh: make(chan struct{}),
+ triggered: true,
+ channelMap: make(map[string]chan *dump.WatchInstanceChangedEvent),
}
}
@@ -151,8 +138,6 @@ func (s *Server) Run(ctx context.Context) {
s.task.Handle(s.tickHandler)
- s.DataRemoveTickHandler()
-
s.task.Run(ctx)
go s.NewHTTPServer()
@@ -302,6 +287,13 @@ func (s *Server) watchInstance() error {
return nil
}
+func instFromOtherSC(instance *dump.Instance, m *pb.MappingEntry) bool {
+ if instance.Value.InstanceId == m.CurInstanceID && m.OrgInstanceID != "" {
+ return true
+ }
+ return false
+}
+
func (s *Server) addToQueue(event *dump.WatchInstanceChangedEvent) {
mapping := s.servicecenter.GetSyncMapping()
@@ -313,96 +305,52 @@ func (s *Server) addToQueue(event *dump.WatchInstanceChangedEvent) {
}
}
- s.queueLock.Lock()
- s.eventQueue = append(s.eventQueue, event)
- log.Debugf("success add instance event to queue:%s len:%s", event, len(s.eventQueue))
- s.queueLock.Unlock()
-}
-
-func instFromOtherSC(instance *dump.Instance, m *pb.MappingEntry) bool {
- if instance.Value.InstanceId == m.CurInstanceID && m.OrgInstanceID != "" {
- return true
- }
- return false
-}
-
-func (s *Server) getRevision(addr string) int64 {
- s.mapLock.RLock()
- value, ok := s.revisionMap[addr]
- s.mapLock.RUnlock()
- if ok {
- return value.revision
+ for _, ch := range s.channelMap {
+ select {
+ case ch <- event:
+ log.Info("add event to queue")
+ default:
+ log.Info("channel buffer is full")
+ }
}
- return -1
}
-func (s *Server) getAction(addr string) string {
- s.mapLock.RLock()
- value, ok := s.revisionMap[addr]
- s.mapLock.RUnlock()
+func (s *Server) getSyncDataLength(addr string) (response *pb.DeclareResponse) {
+ ch, ok := s.channelMap[addr]
+ var length int64
if ok {
- return value.action
+ length = int64(len(ch))
+ } else {
+ length = 0
+ log.Error("fail to find the specific channel according to the addr", utils.ErrChannelSearch)
}
- return ""
-}
-
-func (s *Server) getSyncDataLength(addr string) (response *pb.DeclareResponse) {
response = &pb.DeclareResponse{
- SyncDataLength: int64(len(s.GetIncrementQueue(addr))),
+ SyncDataLength: length,
}
return response
}
-func (s *Server) updateRevisionMap(addr string, incrementQueue []*dump.WatchInstanceChangedEvent) {
- if len(incrementQueue) == 0 {
- log.Info("incrementQueue is empty, no need to update RevisionMap")
- return
- }
-
- log.Debug(fmt.Sprintf("update RevisionMap, addr = %s", addr))
- s.mapLock.Lock()
- s.revisionMap[addr] = record{
- incrementQueue[len(incrementQueue)-1].Revision,
- incrementQueue[len(incrementQueue)-1].Action,
- }
- s.mapLock.Unlock()
-}
-
-func (s *Server) GetIncrementQueue(addr string) []*dump.WatchInstanceChangedEvent {
- revision := s.getRevision(addr)
- action := s.getAction(addr)
-
- s.queueLock.RLock()
- defer s.queueLock.RUnlock()
-
- length := len(s.eventQueue)
- if length == 0 {
- log.Info("eventQueue is empty")
+func (s *Server) GetIncrementQueue(req *pb.IncrementPullRequest) (queue []*dump.WatchInstanceChangedEvent) {
+ ch, ok := s.channelMap[req.GetAddr()]
+ if !ok {
+ log.Debug("fail to find the queue according to the addr")
return nil
}
- if revision == -1 {
- return s.eventQueue
- }
-
- index := 0
- for _, event := range s.eventQueue {
- if event.Revision == revision && event.Action == action {
- break
+ queue = make([]*dump.WatchInstanceChangedEvent, 0, len(ch))
+ for i := 0; i < int(req.GetLength()); i++ {
+ select {
+ case temp, ok := <-ch:
+ if !ok {
+ log.Debug("channel closed")
+ return
+ }
+ queue = append(queue, temp)
+ default:
+ return
}
- index++
- }
-
- if index == length-1 {
- log.Info("no incremental event in the queue")
- return nil
- }
-
- if index == length {
- log.Info(fmt.Sprintf("fail to find the event in the queue by RevisionMap, Revision = %d, Action = %s", revision, action))
- return s.eventQueue
}
- return s.eventQueue[index+1:]
+ return
}
func (s *Server) GetIncrementData(ctx context.Context, incrementQueue []*dump.WatchInstanceChangedEvent) (data *pb.SyncData) {