You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2020/12/14 01:49:31 UTC

[servicecomb-service-center] branch master updated: Syncer optimization: replace slice with channel in the implemention of queue (#773)

This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 948ba02  Syncer optimization: replace slice with channel in the implemention of queue (#773)
948ba02 is described below

commit 948ba0267b70eb0aa9f41b8e413c876098a26afd
Author: lilai23 <46...@users.noreply.github.com>
AuthorDate: Mon Dec 14 09:49:21 2020 +0800

    Syncer optimization: replace slice with channel in the implemention of queue (#773)
    
    * Syncer optimization: replace slice with channel in the implemention of queue
    
    * - 确保预声明数据量与实际同步相同
    - 更新文档
    
    * 更新UT
    
    * update readme
---
 datasource/etcd/event/instance_event_handler.go |   1 -
 pkg/dump/dump.go                                |   1 -
 server/syncernotify/websocket_test.go           |   1 -
 syncer/README-ZH.md                             |   9 +-
 syncer/README.md                                |  14 +-
 syncer/client/sync_client.go                    |   8 +-
 syncer/client/sync_client_test.go               |   6 +-
 syncer/pkg/utils/error.go                       |   1 +
 syncer/proto/syncer.pb.go                       | 252 ++++++++++--------------
 syncer/proto/syncer.proto                       |   2 +
 syncer/server/handler.go                        |  72 +------
 syncer/server/handler_test.go                   | 126 ++----------
 syncer/server/server.go                         | 144 +++++---------
 13 files changed, 216 insertions(+), 421 deletions(-)

diff --git a/datasource/etcd/event/instance_event_handler.go b/datasource/etcd/event/instance_event_handler.go
index dfe2333..ae5919a 100644
--- a/datasource/etcd/event/instance_event_handler.go
+++ b/datasource/etcd/event/instance_event_handler.go
@@ -168,7 +168,6 @@ func NotifySyncerInstanceEvent(evt sd.KvEvent, domainProject string, ms *pb.Micr
 		Action:   string(evt.Type),
 		Service:  service,
 		Instance: instance,
-		Revision: evt.Revision,
 	}
 
 	syncernotify.GetSyncerNotifyCenter().AddEvent(instEvent)
diff --git a/pkg/dump/dump.go b/pkg/dump/dump.go
index d4cc8e3..20e462e 100644
--- a/pkg/dump/dump.go
+++ b/pkg/dump/dump.go
@@ -227,5 +227,4 @@ type WatchInstanceChangedEvent struct {
 	Action   string        `protobuf:"bytes,1,opt,name=action" json:"action,omitempty"`
 	Service  *Microservice `protobuf:"bytes,2,opt,name=service" json:"service,omitempty"`
 	Instance *Instance     `protobuf:"bytes,3,opt,name=instance" json:"instance,omitempty"`
-	Revision int64         `protobuf:"bytes,4,opt,name=revision" json:"revision,omitempty"`
 }
diff --git a/server/syncernotify/websocket_test.go b/server/syncernotify/websocket_test.go
index be9f927..9dd77d6 100644
--- a/server/syncernotify/websocket_test.go
+++ b/server/syncernotify/websocket_test.go
@@ -117,7 +117,6 @@ func TestDoWebSocketWatch(t *testing.T) {
 					HostName: "sunlisen",
 				},
 			},
-			Revision: 10,
 		}
 
 		GetSyncerNotifyCenter().AddEvent(instanceEvent)
diff --git a/syncer/README-ZH.md b/syncer/README-ZH.md
index dc99363..80dabf0 100644
--- a/syncer/README-ZH.md
+++ b/syncer/README-ZH.md
@@ -57,6 +57,12 @@ $ go build
 - cluster-port: 当mode为“cluster”时,Syncer集群成员之间进行通信的端口
 - node:当mode为“cluster”时,syncer集群成员名称。
 
+###### 同步模式说明
+- 增量同步:默认的同步模式。
+- 全量同步:每个syncer启动加入时自动触发一次,也可以在增量同步异常告警之后手动调用接口触发。
+```bash
+$ curl http://localhost:30300/v1/syncer/full-synchronization
+```
 
 假设有2个服务中心,每个服务中心都有一个用于微服务发现和注册的服务中心集群,如下所示:   
 
@@ -110,4 +116,5 @@ Syncer是一个开发中版本,在下面列出已支持的特性,更多开
 
 - 支持多个servicecomb-service-center 服务中心之间进行数据同步
 - 在etcd中固化存储微服务实例映射表
-- 支持集群模式部署Syncer,每个syncer集群拥有3个实例
\ No newline at end of file
+- 支持集群模式部署Syncer,每个syncer集群拥有3个实例
+- 支持增量同步为主要同步机制,每个syncer加入时触发一次全量同步,增量同步异常告警后也可手动进行全量同步
\ No newline at end of file
diff --git a/syncer/README.md b/syncer/README.md
index ff34a5b..1d94e88 100644
--- a/syncer/README.md
+++ b/syncer/README.md
@@ -80,8 +80,19 @@ $ go build
 
   Member name of Syncer cluster when mode is set to be "cluster".
   
+###### Synchronization Mode Description
+- Incremental Synchronization:
+
+  Default synchronization mode.
   
+- Full Synchronization:
 
+  A full synchronization is automatically triggered when each syncer starts to join, 
+  or it can be triggered manually by calling the interface after an exception occurs in the incremental synchronization.
+
+```bash
+$ curl http://localhost:30300/v1/syncer/full-synchronization
+```
 
 Suppose there are 2 Service centers, each of them with a Service-center cluster for microservices discovery and registry, as following,   
 
@@ -136,4 +147,5 @@ Syncer is in developing progress, reference to [TODO](./TODO.md) to get more dev
 - Data synchronization among multiple servicecomb-service-centers
 - Solidify the mapping table of micro-service instances into etcd
 - Support Syncer cluster mode, each Syncer has 3 instances
-
+- Support incremental synchronization as the main synchronization mechanism. 
+  When each syncer joins, a full synchronization is triggered. It can also be performed manually after an exception occurs in incremental synchronization.
diff --git a/syncer/client/sync_client.go b/syncer/client/sync_client.go
index ae71d5d..a425e2f 100644
--- a/syncer/client/sync_client.go
+++ b/syncer/client/sync_client.go
@@ -61,8 +61,8 @@ func NewSyncClient(addr string, tlsConf *tls.Config) (cli *Client) {
 }
 
 // Pull implement the interface of sync server
-func (c *Client) Pull(ctx context.Context) (*pb.SyncData, error) {
-	data, err := c.cli.Pull(ctx, &pb.PullRequest{})
+func (c *Client) Pull(ctx context.Context, addr string) (*pb.SyncData, error) {
+	data, err := c.cli.Pull(ctx, &pb.PullRequest{Addr: addr})
 	if err != nil {
 		log.Errorf(err, "Pull from grpc client failed, going to close the client")
 		closeClient(c.addr)
@@ -70,8 +70,8 @@ func (c *Client) Pull(ctx context.Context) (*pb.SyncData, error) {
 	return data, err
 }
 
-func (c *Client) IncrementPull(ctx context.Context, addr string) (*pb.SyncData, error) {
-	data, err := c.cli.IncrementPull(ctx, &pb.IncrementPullRequest{Addr: addr})
+func (c *Client) IncrementPull(ctx context.Context, req *pb.IncrementPullRequest) (*pb.SyncData, error) {
+	data, err := c.cli.IncrementPull(ctx, req)
 	if err != nil {
 		log.Error("Pull from grpc client failed, going to close the client", err)
 		closeClient(c.addr)
diff --git a/syncer/client/sync_client_test.go b/syncer/client/sync_client_test.go
index f666c7b..1317687 100644
--- a/syncer/client/sync_client_test.go
+++ b/syncer/client/sync_client_test.go
@@ -15,18 +15,18 @@ var c = NewSyncClient("", new(tls.Config))
 
 func TestClient_IncrementPull(t *testing.T) {
 	t.Run("Test IncrementPull", func(t *testing.T) {
-		_, err := c.IncrementPull(context.Background(), "http://127.0.0.1")
+		_, err := c.IncrementPull(context.Background(), &pb.IncrementPullRequest{Addr: "http://127.0.0.1", Length: 3})
 		assert.Error(t, err, "IncrementPull fail without grpc")
 	})
 	t.Run("Test IncrementPull", func(t *testing.T) {
 		defer monkey.UnpatchAll()
 
 		monkey.PatchInstanceMethod(reflect.TypeOf((*Client)(nil)),
-			"IncrementPull", func(client *Client, ctx context.Context, string2 string) (*pb.SyncData, error) {
+			"IncrementPull", func(client *Client, ctx context.Context, req *pb.IncrementPullRequest) (*pb.SyncData, error) {
 				return syncDataCreate(), nil
 			})
 
-		syncData, err := c.IncrementPull(context.Background(), "http://127.0.0.1")
+		syncData, err := c.IncrementPull(context.Background(), &pb.IncrementPullRequest{Addr: "http://127.0.0.1", Length: 3})
 		assert.NoError(t, err, "IncrementPull no err when client exist")
 		assert.NotNil(t, syncData, "syncData not nil when client exist")
 	})
diff --git a/syncer/pkg/utils/error.go b/syncer/pkg/utils/error.go
index 2c06673..19e922f 100644
--- a/syncer/pkg/utils/error.go
+++ b/syncer/pkg/utils/error.go
@@ -26,4 +26,5 @@ var (
 	ErrActionInvalid  = errors.New("action invalid")
 	ErrMappingSearch  = errors.New("mapping does not exist")
 	ErrInstanceDelete = errors.New("delete instance failed")
+	ErrChannelSearch  = errors.New("channel does not exist")
 )
diff --git a/syncer/proto/syncer.pb.go b/syncer/proto/syncer.pb.go
index 72822ea..a0919c5 100644
--- a/syncer/proto/syncer.pb.go
+++ b/syncer/proto/syncer.pb.go
@@ -23,12 +23,12 @@ import (
 	fmt "fmt"
 	math "math"
 
-	proto1 "github.com/golang/protobuf/proto"
+	proto "github.com/golang/protobuf/proto"
 	grpc "google.golang.org/grpc"
 )
 
 // Reference imports to suppress errors if they are not otherwise used.
-var _ = proto1.Marshal
+var _ = proto.Marshal
 var _ = fmt.Errorf
 var _ = math.Inf
 
@@ -36,7 +36,7 @@ var _ = math.Inf
 // is compatible with the proto package it is being compiled against.
 // A compilation error at this line likely means your copy of the
 // proto package needs to be updated.
-const _ = proto1.ProtoPackageIsVersion3 // please upgrade the proto package
+const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
 
 type SyncService_Status int32
 
@@ -59,7 +59,7 @@ var SyncService_Status_value = map[string]int32{
 }
 
 func (x SyncService_Status) String() string {
-	return proto1.EnumName(SyncService_Status_name, int32(x))
+	return proto.EnumName(SyncService_Status_name, int32(x))
 }
 
 func (SyncService_Status) EnumDescriptor() ([]byte, []int) {
@@ -93,7 +93,7 @@ var SyncInstance_Status_value = map[string]int32{
 }
 
 func (x SyncInstance_Status) String() string {
-	return proto1.EnumName(SyncInstance_Status_name, int32(x))
+	return proto.EnumName(SyncInstance_Status_name, int32(x))
 }
 
 func (SyncInstance_Status) EnumDescriptor() ([]byte, []int) {
@@ -121,7 +121,7 @@ var HealthCheck_Modes_value = map[string]int32{
 }
 
 func (x HealthCheck_Modes) String() string {
-	return proto1.EnumName(HealthCheck_Modes_name, int32(x))
+	return proto.EnumName(HealthCheck_Modes_name, int32(x))
 }
 
 func (HealthCheck_Modes) EnumDescriptor() ([]byte, []int) {
@@ -132,10 +132,11 @@ type PullRequest struct {
 	ServiceName string `protobuf:"bytes,1,opt,name=serviceName,proto3" json:"serviceName,omitempty"`
 	Options     string `protobuf:"bytes,2,opt,name=options,proto3" json:"options,omitempty"`
 	Time        string `protobuf:"bytes,3,opt,name=time,proto3" json:"time,omitempty"`
+	Addr        string `protobuf:"bytes,4,opt,name=addr,proto3" json:"addr,omitempty"`
 }
 
 func (m *PullRequest) Reset()         { *m = PullRequest{} }
-func (m *PullRequest) String() string { return proto1.CompactTextString(m) }
+func (m *PullRequest) String() string { return proto.CompactTextString(m) }
 func (*PullRequest) ProtoMessage()    {}
 func (*PullRequest) Descriptor() ([]byte, []int) {
 	return fileDescriptor_9577b640f2aab197, []int{0}
@@ -162,12 +163,20 @@ func (m *PullRequest) GetTime() string {
 	return ""
 }
 
+func (m *PullRequest) GetAddr() string {
+	if m != nil {
+		return m.Addr
+	}
+	return ""
+}
+
 type IncrementPullRequest struct {
-	Addr string `protobuf:"bytes,1,opt,name=addr,proto3" json:"addr,omitempty"`
+	Addr   string `protobuf:"bytes,1,opt,name=addr,proto3" json:"addr,omitempty"`
+	Length int64  `protobuf:"varint,2,opt,name=length,proto3" json:"length,omitempty"`
 }
 
 func (m *IncrementPullRequest) Reset()         { *m = IncrementPullRequest{} }
-func (m *IncrementPullRequest) String() string { return proto1.CompactTextString(m) }
+func (m *IncrementPullRequest) String() string { return proto.CompactTextString(m) }
 func (*IncrementPullRequest) ProtoMessage()    {}
 func (*IncrementPullRequest) Descriptor() ([]byte, []int) {
 	return fileDescriptor_9577b640f2aab197, []int{1}
@@ -180,12 +189,19 @@ func (m *IncrementPullRequest) GetAddr() string {
 	return ""
 }
 
+func (m *IncrementPullRequest) GetLength() int64 {
+	if m != nil {
+		return m.Length
+	}
+	return 0
+}
+
 type DeclareRequest struct {
 	Addr string `protobuf:"bytes,1,opt,name=addr,proto3" json:"addr,omitempty"`
 }
 
 func (m *DeclareRequest) Reset()         { *m = DeclareRequest{} }
-func (m *DeclareRequest) String() string { return proto1.CompactTextString(m) }
+func (m *DeclareRequest) String() string { return proto.CompactTextString(m) }
 func (*DeclareRequest) ProtoMessage()    {}
 func (*DeclareRequest) Descriptor() ([]byte, []int) {
 	return fileDescriptor_9577b640f2aab197, []int{2}
@@ -203,7 +219,7 @@ type DeclareResponse struct {
 }
 
 func (m *DeclareResponse) Reset()         { *m = DeclareResponse{} }
-func (m *DeclareResponse) String() string { return proto1.CompactTextString(m) }
+func (m *DeclareResponse) String() string { return proto.CompactTextString(m) }
 func (*DeclareResponse) ProtoMessage()    {}
 func (*DeclareResponse) Descriptor() ([]byte, []int) {
 	return fileDescriptor_9577b640f2aab197, []int{3}
@@ -222,7 +238,7 @@ type SyncData struct {
 }
 
 func (m *SyncData) Reset()         { *m = SyncData{} }
-func (m *SyncData) String() string { return proto1.CompactTextString(m) }
+func (m *SyncData) String() string { return proto.CompactTextString(m) }
 func (*SyncData) ProtoMessage()    {}
 func (*SyncData) Descriptor() ([]byte, []int) {
 	return fileDescriptor_9577b640f2aab197, []int{4}
@@ -247,7 +263,7 @@ type SyncService struct {
 	App           string             `protobuf:"bytes,2,opt,name=app,proto3" json:"app,omitempty"`
 	Name          string             `protobuf:"bytes,3,opt,name=name,proto3" json:"name,omitempty"`
 	Version       string             `protobuf:"bytes,4,opt,name=version,proto3" json:"version,omitempty"`
-	Status        SyncService_Status `protobuf:"varint,5,opt,name=status,proto3,enum=proto1.SyncService_Status" json:"status,omitempty"`
+	Status        SyncService_Status `protobuf:"varint,5,opt,name=status,proto3,enum=proto.SyncService_Status" json:"status,omitempty"`
 	DomainProject string             `protobuf:"bytes,6,opt,name=domainProject,proto3" json:"domainProject,omitempty"`
 	Environment   string             `protobuf:"bytes,7,opt,name=environment,proto3" json:"environment,omitempty"`
 	PluginName    string             `protobuf:"bytes,8,opt,name=pluginName,proto3" json:"pluginName,omitempty"`
@@ -255,7 +271,7 @@ type SyncService struct {
 }
 
 func (m *SyncService) Reset()         { *m = SyncService{} }
-func (m *SyncService) String() string { return proto1.CompactTextString(m) }
+func (m *SyncService) String() string { return proto.CompactTextString(m) }
 func (*SyncService) ProtoMessage()    {}
 func (*SyncService) Descriptor() ([]byte, []int) {
 	return fileDescriptor_9577b640f2aab197, []int{5}
@@ -329,7 +345,7 @@ type SyncInstance struct {
 	ServiceId   string              `protobuf:"bytes,2,opt,name=serviceId,proto3" json:"serviceId,omitempty"`
 	Endpoints   []string            `protobuf:"bytes,3,rep,name=endpoints,proto3" json:"endpoints,omitempty"`
 	HostName    string              `protobuf:"bytes,4,opt,name=hostName,proto3" json:"hostName,omitempty"`
-	Status      SyncInstance_Status `protobuf:"varint,5,opt,name=status,proto3,enum=proto1.SyncInstance_Status" json:"status,omitempty"`
+	Status      SyncInstance_Status `protobuf:"varint,5,opt,name=status,proto3,enum=proto.SyncInstance_Status" json:"status,omitempty"`
 	HealthCheck *HealthCheck        `protobuf:"bytes,6,opt,name=healthCheck,proto3" json:"healthCheck,omitempty"`
 	Version     string              `protobuf:"bytes,7,opt,name=version,proto3" json:"version,omitempty"`
 	PluginName  string              `protobuf:"bytes,8,opt,name=pluginName,proto3" json:"pluginName,omitempty"`
@@ -337,7 +353,7 @@ type SyncInstance struct {
 }
 
 func (m *SyncInstance) Reset()         { *m = SyncInstance{} }
-func (m *SyncInstance) String() string { return proto1.CompactTextString(m) }
+func (m *SyncInstance) String() string { return proto.CompactTextString(m) }
 func (*SyncInstance) ProtoMessage()    {}
 func (*SyncInstance) Descriptor() ([]byte, []int) {
 	return fileDescriptor_9577b640f2aab197, []int{6}
@@ -413,7 +429,7 @@ type Expansion struct {
 }
 
 func (m *Expansion) Reset()         { *m = Expansion{} }
-func (m *Expansion) String() string { return proto1.CompactTextString(m) }
+func (m *Expansion) String() string { return proto.CompactTextString(m) }
 func (*Expansion) ProtoMessage()    {}
 func (*Expansion) Descriptor() ([]byte, []int) {
 	return fileDescriptor_9577b640f2aab197, []int{7}
@@ -441,7 +457,7 @@ func (m *Expansion) GetLabels() map[string]string {
 }
 
 type HealthCheck struct {
-	Mode     HealthCheck_Modes `protobuf:"varint,1,opt,name=mode,proto3,enum=proto1.HealthCheck_Modes" json:"mode,omitempty"`
+	Mode     HealthCheck_Modes `protobuf:"varint,1,opt,name=mode,proto3,enum=proto.HealthCheck_Modes" json:"mode,omitempty"`
 	Port     int32             `protobuf:"varint,2,opt,name=port,proto3" json:"port,omitempty"`
 	Interval int32             `protobuf:"varint,3,opt,name=interval,proto3" json:"interval,omitempty"`
 	Times    int32             `protobuf:"varint,4,opt,name=times,proto3" json:"times,omitempty"`
@@ -449,7 +465,7 @@ type HealthCheck struct {
 }
 
 func (m *HealthCheck) Reset()         { *m = HealthCheck{} }
-func (m *HealthCheck) String() string { return proto1.CompactTextString(m) }
+func (m *HealthCheck) String() string { return proto.CompactTextString(m) }
 func (*HealthCheck) ProtoMessage()    {}
 func (*HealthCheck) Descriptor() ([]byte, []int) {
 	return fileDescriptor_9577b640f2aab197, []int{8}
@@ -501,7 +517,7 @@ type MappingEntry struct {
 }
 
 func (m *MappingEntry) Reset()         { *m = MappingEntry{} }
-func (m *MappingEntry) String() string { return proto1.CompactTextString(m) }
+func (m *MappingEntry) String() string { return proto.CompactTextString(m) }
 func (*MappingEntry) ProtoMessage()    {}
 func (*MappingEntry) Descriptor() ([]byte, []int) {
 	return fileDescriptor_9577b640f2aab197, []int{9}
@@ -550,80 +566,81 @@ func (m *MappingEntry) GetCurInstanceID() string {
 }
 
 func init() {
-	proto1.RegisterEnum("proto.SyncService_Status", SyncService_Status_name, SyncService_Status_value)
-	proto1.RegisterEnum("proto.SyncInstance_Status", SyncInstance_Status_name, SyncInstance_Status_value)
-	proto1.RegisterEnum("proto.HealthCheck_Modes", HealthCheck_Modes_name, HealthCheck_Modes_value)
-	proto1.RegisterType((*PullRequest)(nil), "proto.PullRequest")
-	proto1.RegisterType((*IncrementPullRequest)(nil), "proto.IncrementPullRequest")
-	proto1.RegisterType((*DeclareRequest)(nil), "proto.DeclareRequest")
-	proto1.RegisterType((*DeclareResponse)(nil), "proto.DeclareResponse")
-	proto1.RegisterType((*SyncData)(nil), "proto.SyncData")
-	proto1.RegisterType((*SyncService)(nil), "proto.SyncService")
-	proto1.RegisterType((*SyncInstance)(nil), "proto.SyncInstance")
-	proto1.RegisterType((*Expansion)(nil), "proto.Expansion")
-	proto1.RegisterMapType((map[string]string)(nil), "proto.Expansion.LabelsEntry")
-	proto1.RegisterType((*HealthCheck)(nil), "proto.HealthCheck")
-	proto1.RegisterType((*MappingEntry)(nil), "proto.MappingEntry")
-}
-
-func init() { proto1.RegisterFile("syncer.proto1", fileDescriptor_9577b640f2aab197) }
+	proto.RegisterEnum("proto.SyncService_Status", SyncService_Status_name, SyncService_Status_value)
+	proto.RegisterEnum("proto.SyncInstance_Status", SyncInstance_Status_name, SyncInstance_Status_value)
+	proto.RegisterEnum("proto.HealthCheck_Modes", HealthCheck_Modes_name, HealthCheck_Modes_value)
+	proto.RegisterType((*PullRequest)(nil), "proto.PullRequest")
+	proto.RegisterType((*IncrementPullRequest)(nil), "proto.IncrementPullRequest")
+	proto.RegisterType((*DeclareRequest)(nil), "proto.DeclareRequest")
+	proto.RegisterType((*DeclareResponse)(nil), "proto.DeclareResponse")
+	proto.RegisterType((*SyncData)(nil), "proto.SyncData")
+	proto.RegisterType((*SyncService)(nil), "proto.SyncService")
+	proto.RegisterType((*SyncInstance)(nil), "proto.SyncInstance")
+	proto.RegisterType((*Expansion)(nil), "proto.Expansion")
+	proto.RegisterMapType((map[string]string)(nil), "proto.Expansion.LabelsEntry")
+	proto.RegisterType((*HealthCheck)(nil), "proto.HealthCheck")
+	proto.RegisterType((*MappingEntry)(nil), "proto.MappingEntry")
+}
+
+func init() { proto.RegisterFile("syncer.proto", fileDescriptor_9577b640f2aab197) }
 
 var fileDescriptor_9577b640f2aab197 = []byte{
-	// 854 bytes of a gzipped FileDescriptorProto
+	// 868 bytes of a gzipped FileDescriptorProto
 	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x55, 0xdd, 0x8e, 0xe3, 0x34,
-	0x14, 0x9e, 0xa4, 0xe9, 0xdf, 0x49, 0x77, 0xb6, 0x1c, 0x16, 0x14, 0xca, 0x08, 0x55, 0xd1, 0x0a,
-	0x46, 0x08, 0x2a, 0xb6, 0xec, 0x05, 0x0b, 0x17, 0x08, 0xb6, 0xc3, 0x6e, 0xc5, 0x4c, 0xa7, 0x72,
-	0xa7, 0x20, 0x21, 0x71, 0x91, 0x49, 0xad, 0x36, 0x4c, 0xea, 0x04, 0xdb, 0xa9, 0x98, 0x07, 0x82,
-	0xa7, 0x80, 0x87, 0xe1, 0x96, 0xa7, 0x40, 0x76, 0xdc, 0xd6, 0xe9, 0x54, 0x88, 0x8b, 0xbd, 0x8a,
-	0xfd, 0xf9, 0xcb, 0xf1, 0xf9, 0xf9, 0x7c, 0x0e, 0x74, 0xc4, 0x3d, 0x8b, 0x29, 0x1f, 0xe4, 0x3c,
-	0x93, 0x19, 0xd6, 0xf5, 0x27, 0xfc, 0x19, 0xfc, 0x69, 0x91, 0xa6, 0x84, 0xfe, 0x5a, 0x50, 0x21,
-	0xb1, 0x0f, 0xbe, 0xa0, 0x7c, 0x93, 0xc4, 0x74, 0x12, 0xad, 0x69, 0xe0, 0xf4, 0x9d, 0xf3, 0x36,
-	0xb1, 0x21, 0x0c, 0xa0, 0x99, 0xe5, 0x32, 0xc9, 0x98, 0x08, 0x5c, 0x7d, 0xba, 0xdd, 0x22, 0x82,
-	0x27, 0x93, 0x35, 0x0d, 0x6a, 0x1a, 0xd6, 0xeb, 0xf0, 0x63, 0x78, 0x32, 0x66, 0x31, 0xa7, 0x6b,
-	0xca, 0xa4, 0x7d, 0x0f, 0x82, 0x17, 0x2d, 0x16, 0xdc, 0x5c, 0xa0, 0xd7, 0xe1, 0x53, 0x38, 0x1d,
-	0xd1, 0x38, 0x8d, 0x38, 0xfd, 0x2f, 0xd6, 0x0b, 0x78, 0xbc, 0x63, 0x89, 0x3c, 0x63, 0x82, 0xe2,
-	0x87, 0x70, 0xaa, 0x42, 0x1b, 0x45, 0x32, 0xba, 0xa4, 0x6c, 0x29, 0x57, 0xfa, 0x87, 0x1a, 0x39,
-	0x40, 0xc3, 0x35, 0xb4, 0x66, 0x06, 0xc1, 0x01, 0xb4, 0x4c, 0x54, 0x22, 0x70, 0xfa, 0xb5, 0x73,
-	0x7f, 0x88, 0x65, 0x62, 0x06, 0x8a, 0x32, 0x2b, 0x8f, 0xc8, 0x8e, 0x83, 0xcf, 0xa0, 0x9d, 0x30,
-	0x21, 0x23, 0xa6, 0x7e, 0x70, 0xf5, 0x0f, 0x6f, 0x5b, 0x3f, 0x8c, 0xcd, 0x19, 0xd9, 0xb3, 0xc2,
-	0xbf, 0x5d, 0xf0, 0x2d, 0x63, 0x78, 0x06, 0x6d, 0x63, 0x6e, 0xbc, 0x30, 0x21, 0xed, 0x01, 0xec,
-	0x42, 0x2d, 0xca, 0x73, 0x93, 0x53, 0xb5, 0x54, 0xd1, 0xb3, 0x68, 0x9f, 0x4f, 0x66, 0xb2, 0xbf,
-	0xa1, 0x5c, 0x24, 0x19, 0x0b, 0xbc, 0x32, 0xfb, 0x66, 0x8b, 0xcf, 0xa0, 0x21, 0x64, 0x24, 0x0b,
-	0x11, 0xd4, 0xfb, 0xce, 0xf9, 0xe9, 0xf0, 0xbd, 0x87, 0xe1, 0x0c, 0x66, 0x9a, 0x40, 0x0c, 0x11,
-	0x9f, 0xc2, 0xa3, 0x45, 0xb6, 0x8e, 0x12, 0x36, 0xe5, 0xd9, 0x2f, 0x34, 0x96, 0x41, 0x43, 0x9b,
-	0xac, 0x82, 0x4a, 0x12, 0x94, 0x6d, 0x12, 0x9e, 0x31, 0x55, 0xc4, 0xa0, 0x59, 0x4a, 0xc2, 0x82,
-	0xf0, 0x03, 0x80, 0x3c, 0x2d, 0x96, 0x09, 0xd3, 0x9a, 0x69, 0x69, 0x82, 0x85, 0xe0, 0x67, 0x00,
-	0xf4, 0xb7, 0x3c, 0x62, 0x42, 0xab, 0xa6, 0xad, 0x93, 0xd7, 0x35, 0xee, 0x5d, 0x6c, 0x0f, 0x88,
-	0xc5, 0x09, 0x3f, 0x82, 0x46, 0xe9, 0x2b, 0xfa, 0xd0, 0x9c, 0x4f, 0xbe, 0x9f, 0x5c, 0xff, 0x38,
-	0xe9, 0x9e, 0x60, 0x03, 0xdc, 0xf9, 0xb4, 0xeb, 0x60, 0x0b, 0xbc, 0x91, 0x42, 0xdc, 0xf0, 0xf7,
-	0x1a, 0x74, 0xec, 0xfc, 0x2b, 0x5f, 0xb6, 0x15, 0xd8, 0x65, 0xd9, 0x42, 0xaa, 0x45, 0x70, 0x0f,
-	0x8b, 0x70, 0x06, 0x6d, 0xca, 0x16, 0x79, 0x96, 0x30, 0x29, 0x82, 0x5a, 0xbf, 0xa6, 0x4e, 0x77,
-	0x00, 0xf6, 0xa0, 0xb5, 0xca, 0x84, 0xd4, 0x51, 0x96, 0xd9, 0xdf, 0xed, 0x71, 0x78, 0x90, 0xfe,
-	0xde, 0x11, 0x71, 0x1c, 0xe6, 0xff, 0x39, 0xf8, 0x2b, 0x1a, 0xa5, 0x72, 0xf5, 0x72, 0x45, 0xe3,
-	0x3b, 0x9d, 0xfd, 0xbd, 0x0c, 0x5f, 0xef, 0x4f, 0x88, 0x4d, 0xb3, 0x25, 0xd0, 0xac, 0x4a, 0xe0,
-	0xcd, 0xd7, 0xe1, 0xd5, 0xff, 0xac, 0x03, 0x76, 0xa0, 0x35, 0xbb, 0xf9, 0x86, 0xdc, 0x8c, 0x27,
-	0xaf, 0xba, 0x35, 0xec, 0x42, 0xe7, 0x7a, 0x7e, 0x73, 0xfd, 0xdd, 0xec, 0x82, 0xfc, 0x30, 0x7e,
-	0x79, 0xd1, 0xf5, 0xc2, 0x3f, 0x1c, 0x68, 0xef, 0xae, 0x50, 0xca, 0xbe, 0x4b, 0xd8, 0xb6, 0x3c,
-	0x7a, 0x8d, 0x4f, 0xa0, 0x7e, 0x7b, 0x2f, 0x69, 0xd9, 0x55, 0x3a, 0xa4, 0xdc, 0xe0, 0x73, 0x68,
-	0xa4, 0xd1, 0x2d, 0x4d, 0xcb, 0x6a, 0xf8, 0xc3, 0xb3, 0x43, 0x77, 0x07, 0x97, 0xfa, 0xf8, 0x82,
-	0x49, 0x7e, 0x4f, 0x0c, 0xb7, 0xf7, 0x02, 0x7c, 0x0b, 0x56, 0x4f, 0xeb, 0x8e, 0xde, 0x9b, 0xdb,
-	0xd4, 0x52, 0x5d, 0xb6, 0x89, 0xd2, 0x82, 0x1a, 0x05, 0x94, 0x9b, 0x2f, 0xdd, 0x2f, 0x9c, 0xf0,
-	0x2f, 0x07, 0x7c, 0x2b, 0xf5, 0xf8, 0x09, 0x78, 0xeb, 0x6c, 0x51, 0x76, 0xc2, 0xd3, 0x61, 0xf0,
-	0xb0, 0x38, 0x83, 0xab, 0x6c, 0x41, 0x05, 0xd1, 0x2c, 0x15, 0x58, 0x9e, 0x71, 0xa9, 0xcd, 0xd6,
-	0x89, 0x5e, 0x2b, 0xd5, 0x24, 0x4c, 0x52, 0xbe, 0x89, 0x52, 0xfd, 0x94, 0xeb, 0x64, 0xb7, 0x57,
-	0x7e, 0xa8, 0x36, 0x29, 0xb4, 0x9c, 0xea, 0xa4, 0xdc, 0x28, 0x7f, 0x0b, 0x9e, 0x6a, 0x21, 0xb5,
-	0x89, 0x5a, 0x86, 0xe7, 0x50, 0xd7, 0xd7, 0x54, 0xcb, 0xd0, 0x02, 0x6f, 0x3a, 0x9f, 0xbd, 0x2e,
-	0x0b, 0x31, 0x9d, 0x5f, 0x5e, 0x76, 0xdd, 0xf0, 0x1f, 0x07, 0x3a, 0x57, 0x51, 0x9e, 0x27, 0x6c,
-	0x59, 0x06, 0xdf, 0x07, 0x3f, 0x4e, 0x0b, 0x21, 0x29, 0xb7, 0x3b, 0xba, 0x05, 0x3d, 0x6c, 0x03,
-	0xee, 0xb1, 0x36, 0x10, 0x42, 0x27, 0xe3, 0x4b, 0xd3, 0x49, 0xc6, 0x23, 0xd3, 0x95, 0x2a, 0x98,
-	0xb2, 0x94, 0xf1, 0xe5, 0x56, 0xee, 0xe3, 0x91, 0x79, 0x25, 0x55, 0x50, 0x59, 0x8a, 0x0b, 0xbe,
-	0xb7, 0x54, 0xc6, 0x59, 0xc1, 0x94, 0xa5, 0xb8, 0xe0, 0x96, 0x25, 0xd3, 0x9a, 0x2a, 0xe0, 0xf0,
-	0x4f, 0x07, 0x3c, 0xf5, 0xc0, 0xf0, 0x53, 0xf0, 0xd4, 0x74, 0xc1, 0xed, 0xe3, 0xb1, 0x46, 0x4d,
-	0xef, 0xb1, 0xf5, 0x12, 0x55, 0xeb, 0x0f, 0x4f, 0x70, 0x04, 0x6f, 0x99, 0x19, 0xb2, 0x9f, 0x0e,
-	0xf8, 0x8e, 0xe1, 0x55, 0x67, 0x50, 0xef, 0xdd, 0x43, 0xb8, 0x1c, 0x3a, 0xe1, 0x09, 0x7e, 0x0d,
-	0x8f, 0x2a, 0xb3, 0x0d, 0xdf, 0x37, 0xd4, 0x63, 0x13, 0xef, 0x88, 0x1b, 0xdf, 0xb6, 0x7f, 0x6a,
-	0x0e, 0xbe, 0xd2, 0xe8, 0x6d, 0x43, 0x7f, 0x3e, 0xff, 0x37, 0x00, 0x00, 0xff, 0xff, 0xed, 0xcc,
-	0x45, 0x4e, 0xa4, 0x07, 0x00, 0x00,
+	0x14, 0x9e, 0xa4, 0xff, 0x27, 0xdd, 0xd9, 0x62, 0x96, 0x55, 0x28, 0x23, 0x54, 0x45, 0x2b, 0x98,
+	0x0b, 0xa8, 0xd8, 0xb2, 0x17, 0x2c, 0x5c, 0x20, 0x76, 0x3b, 0xec, 0x56, 0xcc, 0x76, 0x2a, 0x77,
+	0x0a, 0x12, 0x77, 0x99, 0xd4, 0x6a, 0xc3, 0xa4, 0x76, 0xc6, 0x76, 0x2a, 0xe6, 0x81, 0xe0, 0x29,
+	0xe0, 0x61, 0xb8, 0xe5, 0x29, 0x90, 0x7f, 0xda, 0x38, 0x9d, 0x0a, 0x71, 0xc1, 0x55, 0x7c, 0x3e,
+	0x7f, 0x39, 0xf6, 0x39, 0xe7, 0xf3, 0x39, 0xd0, 0x15, 0xf7, 0x34, 0x21, 0x7c, 0x98, 0x73, 0x26,
+	0x19, 0x6a, 0xe8, 0x4f, 0x74, 0x07, 0xc1, 0xac, 0xc8, 0x32, 0x4c, 0xee, 0x0a, 0x22, 0x24, 0x1a,
+	0x40, 0x20, 0x08, 0xdf, 0xa6, 0x09, 0x99, 0xc6, 0x1b, 0x12, 0x7a, 0x03, 0xef, 0xbc, 0x83, 0x5d,
+	0x08, 0x85, 0xd0, 0x62, 0xb9, 0x4c, 0x19, 0x15, 0xa1, 0xaf, 0x77, 0x77, 0x26, 0x42, 0x50, 0x97,
+	0xe9, 0x86, 0x84, 0x35, 0x0d, 0xeb, 0xb5, 0xc2, 0xe2, 0xe5, 0x92, 0x87, 0x75, 0x83, 0xa9, 0x75,
+	0xf4, 0x0a, 0x9e, 0x4c, 0x68, 0xc2, 0xc9, 0x86, 0x50, 0xe9, 0x9e, 0xbd, 0xe3, 0x7a, 0x25, 0x17,
+	0x3d, 0x85, 0x66, 0x46, 0xe8, 0x4a, 0xae, 0xf5, 0x61, 0x35, 0x6c, 0xad, 0xe8, 0x19, 0x9c, 0x8e,
+	0x49, 0x92, 0xc5, 0x9c, 0xfc, 0xcb, 0xdf, 0xd1, 0x4b, 0x78, 0xbc, 0x67, 0x89, 0x9c, 0x51, 0x41,
+	0xd0, 0x27, 0x70, 0xaa, 0xd2, 0x30, 0x8e, 0x65, 0x7c, 0x69, 0x1c, 0x7b, 0xda, 0xf1, 0x01, 0x1a,
+	0x6d, 0xa0, 0x3d, 0xb7, 0x08, 0x1a, 0x42, 0xdb, 0x66, 0x40, 0x84, 0xde, 0xa0, 0x76, 0x1e, 0x8c,
+	0x90, 0x49, 0xe2, 0x50, 0x51, 0xe6, 0x66, 0x0b, 0xef, 0x39, 0xe8, 0x39, 0x74, 0x52, 0x2a, 0x64,
+	0x4c, 0xd5, 0x0f, 0xbe, 0xfe, 0xe1, 0x7d, 0xe7, 0x87, 0x89, 0xdd, 0xc3, 0x25, 0x2b, 0xfa, 0xcb,
+	0x87, 0xc0, 0x71, 0x86, 0xce, 0xa0, 0x63, 0xdd, 0x4d, 0x96, 0x36, 0xa4, 0x12, 0x40, 0x3d, 0xa8,
+	0xc5, 0x79, 0x6e, 0xf3, 0xaf, 0x96, 0x2a, 0x7a, 0x1a, 0x97, 0xb9, 0xa7, 0xb6, 0x52, 0x5b, 0xc2,
+	0x45, 0xca, 0xa8, 0x4d, 0xff, 0xce, 0x44, 0xcf, 0xa1, 0x29, 0x64, 0x2c, 0x0b, 0x11, 0x36, 0x06,
+	0xde, 0xf9, 0xe9, 0xe8, 0xc3, 0x87, 0xe1, 0x0c, 0xe7, 0x9a, 0x80, 0x2d, 0x11, 0x3d, 0x83, 0x47,
+	0x4b, 0xb6, 0x89, 0x53, 0x3a, 0xe3, 0xec, 0x17, 0x92, 0xc8, 0xb0, 0xa9, 0x5d, 0x56, 0x41, 0x25,
+	0x1f, 0x42, 0xb7, 0x29, 0x67, 0x54, 0x15, 0x37, 0x6c, 0x19, 0xf9, 0x38, 0x10, 0xfa, 0x18, 0x20,
+	0xcf, 0x8a, 0x55, 0x4a, 0xb5, 0xbe, 0xda, 0x9a, 0xe0, 0x20, 0xe8, 0x0b, 0x00, 0xf2, 0x6b, 0x1e,
+	0x53, 0xa1, 0x15, 0xd6, 0xd1, 0xc9, 0xeb, 0xd9, 0xeb, 0x5d, 0xec, 0x36, 0xb0, 0xc3, 0x89, 0x3e,
+	0x85, 0xa6, 0xb9, 0x2b, 0x0a, 0xa0, 0xb5, 0x98, 0xfe, 0x30, 0xbd, 0xfa, 0x69, 0xda, 0x3b, 0x41,
+	0x4d, 0xf0, 0x17, 0xb3, 0x9e, 0x87, 0xda, 0x50, 0x1f, 0x2b, 0xc4, 0x8f, 0x7e, 0xab, 0x41, 0xd7,
+	0xcd, 0xbf, 0xba, 0xcb, 0xae, 0x02, 0xfb, 0x2c, 0x3b, 0x48, 0xb5, 0x08, 0xfe, 0x61, 0x11, 0xce,
+	0xa0, 0x43, 0xe8, 0x32, 0x67, 0x29, 0x95, 0x22, 0xac, 0x0d, 0x6a, 0x6a, 0x77, 0x0f, 0xa0, 0x3e,
+	0xb4, 0xd7, 0x4c, 0x48, 0x1d, 0xa5, 0xc9, 0xfe, 0xde, 0x46, 0xa3, 0x83, 0xf4, 0xf7, 0x8f, 0x88,
+	0xe3, 0x30, 0xff, 0x2f, 0x20, 0x58, 0x93, 0x38, 0x93, 0xeb, 0xd7, 0x6b, 0x92, 0xdc, 0xea, 0xec,
+	0x97, 0x32, 0x7c, 0x5b, 0xee, 0x60, 0x97, 0xe6, 0x4a, 0xa0, 0x55, 0x95, 0xc0, 0xff, 0x5f, 0x87,
+	0x37, 0xff, 0xb1, 0x0e, 0xa8, 0x0b, 0xed, 0xf9, 0xf5, 0x77, 0xf8, 0x7a, 0x32, 0x7d, 0xd3, 0xab,
+	0xa1, 0x1e, 0x74, 0xaf, 0x16, 0xd7, 0x57, 0xdf, 0xcf, 0x2f, 0xf0, 0x8f, 0x93, 0xd7, 0x17, 0xbd,
+	0x7a, 0xf4, 0xbb, 0x07, 0x9d, 0xfd, 0x11, 0x4a, 0xd9, 0xb7, 0x29, 0xdd, 0x95, 0x47, 0xaf, 0xd1,
+	0x13, 0x68, 0xdc, 0xdc, 0x4b, 0x62, 0x3a, 0x50, 0x17, 0x1b, 0x03, 0xbd, 0x80, 0x66, 0x16, 0xdf,
+	0x90, 0xcc, 0x54, 0x23, 0x18, 0x9d, 0x1d, 0x5e, 0x77, 0x78, 0xa9, 0xb7, 0x2f, 0xa8, 0xe4, 0xf7,
+	0xd8, 0x72, 0xfb, 0x2f, 0x21, 0x70, 0x60, 0xf5, 0xb4, 0x6e, 0xc9, 0xbd, 0x3d, 0x4d, 0x2d, 0xd5,
+	0x61, 0xdb, 0x38, 0x2b, 0x88, 0x55, 0x80, 0x31, 0xbe, 0xf6, 0xbf, 0xf2, 0xa2, 0x3f, 0x3d, 0x08,
+	0x9c, 0xd4, 0xa3, 0xcf, 0xa0, 0xbe, 0x61, 0x4b, 0xd3, 0x35, 0x4f, 0x47, 0xe1, 0xc3, 0xe2, 0x0c,
+	0xdf, 0xb1, 0x25, 0x11, 0x58, 0xb3, 0x54, 0x60, 0x39, 0xe3, 0x52, 0xbb, 0x6d, 0x60, 0xbd, 0x56,
+	0xaa, 0x49, 0xa9, 0x24, 0x7c, 0x1b, 0x67, 0xfa, 0x29, 0x37, 0xf0, 0xde, 0x56, 0xf7, 0x50, 0x2d,
+	0x55, 0x68, 0x39, 0x35, 0xb0, 0x31, 0xd4, 0x7d, 0x0b, 0x9e, 0x69, 0x21, 0x75, 0xb0, 0x5a, 0x46,
+	0xe7, 0xd0, 0xd0, 0xc7, 0x54, 0xcb, 0xd0, 0x86, 0xfa, 0x6c, 0x31, 0x7f, 0x6b, 0x0a, 0x31, 0x5b,
+	0x5c, 0x5e, 0xf6, 0xfc, 0xe8, 0x6f, 0x0f, 0xba, 0xef, 0xe2, 0x3c, 0x4f, 0xe9, 0xca, 0x04, 0x3f,
+	0x80, 0x20, 0xc9, 0x0a, 0x21, 0x09, 0x77, 0xbb, 0xbf, 0x03, 0x3d, 0x6c, 0x03, 0xfe, 0xb1, 0x36,
+	0x10, 0x41, 0x97, 0xf1, 0x95, 0xed, 0x24, 0x93, 0xb1, 0xed, 0x4a, 0x15, 0x4c, 0x79, 0x62, 0x7c,
+	0xb5, 0x93, 0xfb, 0x64, 0x6c, 0x5f, 0x49, 0x15, 0x54, 0x9e, 0x92, 0x82, 0x97, 0x9e, 0x4c, 0x9c,
+	0x15, 0x4c, 0x79, 0x4a, 0x0a, 0xee, 0x78, 0xb2, 0xad, 0xa9, 0x02, 0x8e, 0xfe, 0xf0, 0xa0, 0xae,
+	0x1e, 0x18, 0xfa, 0x1c, 0xea, 0x6a, 0xea, 0xa0, 0xdd, 0xe3, 0x71, 0x46, 0x50, 0xff, 0xb1, 0xf3,
+	0x12, 0x55, 0xeb, 0x8f, 0x4e, 0xd0, 0x18, 0xde, 0xb3, 0x33, 0xa4, 0x9c, 0x0e, 0xe8, 0x03, 0xcb,
+	0xab, 0xce, 0xa0, 0xfe, 0xd3, 0x43, 0xd8, 0x0c, 0x9d, 0xe8, 0x04, 0x7d, 0x0b, 0x8f, 0x2a, 0x33,
+	0x0f, 0x7d, 0x64, 0xa9, 0xc7, 0x26, 0xe1, 0x91, 0x6b, 0xbc, 0xea, 0xfc, 0xdc, 0x1a, 0x7e, 0xa3,
+	0xd1, 0x9b, 0xa6, 0xfe, 0x7c, 0xf9, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x37, 0x82, 0xde, 0xa9,
+	0xd0, 0x07, 0x00, 0x00,
 }
 
 // Reference imports to suppress errors if they are not otherwise used.
@@ -763,56 +780,3 @@ var _Sync_serviceDesc = grpc.ServiceDesc{
 	Streams:  []grpc.StreamDesc{},
 	Metadata: "syncer.proto",
 }
-
-func init() { proto1.RegisterFile("syncer.proto", fileDescriptor0) }
-
-var fileDescriptor0 = []byte{
-	// 751 bytes of a gzipped FileDescriptorProto
-	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x55, 0xcd, 0x6e, 0xdb, 0x48,
-	0x0c, 0x8e, 0x64, 0xf9, 0x8f, 0xf2, 0x66, 0x85, 0xd9, 0x3d, 0x68, 0x8d, 0x60, 0x61, 0x08, 0x0b,
-	0xac, 0x0f, 0xbb, 0x46, 0xe3, 0xa6, 0x40, 0xdb, 0x5b, 0x11, 0xbb, 0x89, 0xd1, 0xc4, 0x31, 0xc6,
-	0x71, 0x7b, 0xea, 0x41, 0x91, 0x07, 0xb6, 0x1a, 0x79, 0xa4, 0x6a, 0x46, 0x46, 0xfd, 0x40, 0xed,
-	0x5b, 0xf4, 0x61, 0x7a, 0xed, 0x53, 0x14, 0xf3, 0x63, 0x7b, 0xe4, 0xe4, 0xd0, 0x43, 0x4f, 0x26,
-	0x3f, 0x52, 0x1c, 0x92, 0x1f, 0x49, 0x43, 0x8b, 0x6d, 0x68, 0x44, 0xf2, 0x5e, 0x96, 0xa7, 0x3c,
-	0x45, 0x55, 0xf9, 0x13, 0xbc, 0x07, 0x77, 0x52, 0x24, 0x09, 0x26, 0x1f, 0x0b, 0xc2, 0x38, 0xea,
-	0x80, 0xcb, 0x48, 0xbe, 0x8e, 0x23, 0x32, 0x0e, 0x57, 0xc4, 0xb7, 0x3a, 0x56, 0xb7, 0x89, 0x4d,
-	0x08, 0xf9, 0x50, 0x4f, 0x33, 0x1e, 0xa7, 0x94, 0xf9, 0xb6, 0xb4, 0x6e, 0x55, 0x84, 0xc0, 0xe1,
-	0xf1, 0x8a, 0xf8, 0x15, 0x09, 0x4b, 0x39, 0x58, 0x41, 0x63, 0xba, 0xa1, 0xd1, 0x20, 0xe4, 0x21,
-	0xea, 0x41, 0x43, 0x07, 0x62, 0xbe, 0xd5, 0xa9, 0x74, 0xdd, 0x3e, 0x52, 0xb9, 0xf4, 0x84, 0xcb,
-	0x54, 0x99, 0xf0, 0xce, 0x07, 0x9d, 0x42, 0x33, 0xa6, 0x8c, 0x87, 0x54, 0x7c, 0x60, 0xcb, 0x0f,
-	0xfe, 0x30, 0x3e, 0x18, 0x69, 0x1b, 0xde, 0x7b, 0x05, 0xdf, 0x6c, 0x70, 0x8d, 0x60, 0xe8, 0x04,
-	0x9a, 0x3a, 0xdc, 0x68, 0xae, 0x8b, 0xd9, 0x03, 0xc8, 0x83, 0x4a, 0x98, 0x65, 0xba, 0x0c, 0x21,
-	0x8a, 0x12, 0x68, 0xb8, 0x2f, 0x81, 0xea, 0x82, 0xd7, 0x24, 0x67, 0x71, 0x4a, 0x7d, 0x47, 0x15,
-	0xac, 0x55, 0x74, 0x0a, 0x35, 0xc6, 0x43, 0x5e, 0x30, 0xbf, 0xda, 0xb1, 0xba, 0xc7, 0xfd, 0xbf,
-	0x1e, 0x96, 0xd3, 0x9b, 0x4a, 0x07, 0xac, 0x1d, 0xd1, 0x3f, 0xf0, 0xdb, 0x3c, 0x5d, 0x85, 0x31,
-	0x9d, 0xe4, 0xe9, 0x07, 0x12, 0x71, 0xbf, 0x26, 0x43, 0x96, 0x41, 0xc1, 0x02, 0xa1, 0xeb, 0x38,
-	0x4f, 0xe9, 0x8a, 0x50, 0xee, 0xd7, 0x15, 0x0b, 0x06, 0x84, 0xfe, 0x06, 0xc8, 0x92, 0x62, 0x11,
-	0x53, 0x49, 0x53, 0x43, 0x3a, 0x18, 0x08, 0x7a, 0x02, 0x40, 0x3e, 0x65, 0x21, 0x65, 0x92, 0xa8,
-	0xa6, 0x6c, 0x9e, 0xa7, 0xd3, 0x1b, 0x6e, 0x0d, 0xd8, 0xf0, 0x09, 0xfe, 0x85, 0x9a, 0xca, 0x15,
-	0xb9, 0x50, 0x9f, 0x8d, 0xdf, 0x8c, 0x6f, 0xde, 0x8d, 0xbd, 0x23, 0x54, 0x03, 0x7b, 0x36, 0xf1,
-	0x2c, 0xd4, 0x00, 0x67, 0x20, 0x10, 0x3b, 0xf8, 0x5c, 0x81, 0x96, 0xd9, 0x7f, 0x91, 0xcb, 0x96,
-	0x81, 0x5d, 0x97, 0x0d, 0xa4, 0x4c, 0x82, 0x7d, 0x48, 0xc2, 0x09, 0x34, 0x09, 0x9d, 0x67, 0x69,
-	0x4c, 0x39, 0xf3, 0x2b, 0x9d, 0x8a, 0xb0, 0xee, 0x00, 0xd4, 0x86, 0xc6, 0x32, 0x65, 0x5c, 0x56,
-	0xa9, 0xba, 0xbf, 0xd3, 0x51, 0xff, 0xa0, 0xfd, 0xed, 0x47, 0x86, 0xe3, 0xb0, 0xff, 0x67, 0xe0,
-	0x2e, 0x49, 0x98, 0xf0, 0xe5, 0xf9, 0x92, 0x44, 0xf7, 0xb2, 0xfb, 0xfb, 0x31, 0xbc, 0xdc, 0x5b,
-	0xb0, 0xe9, 0x66, 0x8e, 0x40, 0xbd, 0x3c, 0x02, 0xbf, 0x9e, 0x87, 0x8b, 0x9f, 0xe4, 0x01, 0xb5,
-	0xa0, 0x31, 0xbd, 0x7d, 0x85, 0x6f, 0x47, 0xe3, 0x0b, 0xaf, 0x82, 0x3c, 0x68, 0xdd, 0xcc, 0x6e,
-	0x6f, 0x5e, 0x4f, 0x87, 0xf8, 0xed, 0xe8, 0x7c, 0xe8, 0x39, 0xc1, 0x17, 0x0b, 0x9a, 0xbb, 0x27,
-	0xc4, 0x64, 0xdf, 0xc7, 0x74, 0x4b, 0x8f, 0x94, 0xd1, 0x9f, 0x50, 0xbd, 0xdb, 0x70, 0xa2, 0x16,
-	0xb9, 0x85, 0x95, 0x82, 0xce, 0xa0, 0x96, 0x84, 0x77, 0x24, 0x51, 0x6c, 0xb8, 0xfd, 0x93, 0xc3,
-	0x74, 0x7b, 0x57, 0xd2, 0x3c, 0xa4, 0x3c, 0xdf, 0x60, 0xed, 0xdb, 0x7e, 0x01, 0xae, 0x01, 0x8b,
-	0xd5, 0xba, 0x27, 0x1b, 0xfd, 0x9a, 0x10, 0xc5, 0x63, 0xeb, 0x30, 0x29, 0x88, 0x9e, 0x00, 0xa5,
-	0xbc, 0xb4, 0x9f, 0x5b, 0xc1, 0x57, 0x0b, 0x5c, 0xa3, 0xf5, 0xe8, 0x3f, 0x70, 0x56, 0xe9, 0x5c,
-	0x1d, 0x9f, 0xe3, 0xbe, 0xff, 0x90, 0x9c, 0xde, 0x75, 0x3a, 0x27, 0x0c, 0x4b, 0x2f, 0x51, 0x58,
-	0x96, 0xe6, 0x5c, 0x86, 0xad, 0x62, 0x29, 0x8b, 0xa9, 0x89, 0x29, 0x27, 0xf9, 0x3a, 0x4c, 0xe4,
-	0x2a, 0x57, 0xf1, 0x4e, 0x17, 0x79, 0x88, 0xcb, 0xc4, 0xe4, 0x38, 0x55, 0xb1, 0x52, 0x44, 0xbe,
-	0x45, 0x9e, 0xc8, 0x41, 0x6a, 0x62, 0x21, 0x06, 0x5d, 0xa8, 0xca, 0x67, 0xca, 0x34, 0x34, 0xc0,
-	0x99, 0xcc, 0xa6, 0x97, 0x8a, 0x88, 0xc9, 0xec, 0xea, 0xca, 0xb3, 0x83, 0xef, 0x16, 0xb4, 0xae,
-	0xc3, 0x2c, 0x8b, 0xe9, 0x42, 0x15, 0xdf, 0x01, 0x37, 0x4a, 0x0a, 0xc6, 0x49, 0x6e, 0x1e, 0x51,
-	0x03, 0x7a, 0x78, 0x06, 0xec, 0xc7, 0xce, 0x40, 0x00, 0xad, 0x34, 0x5f, 0xe8, 0x4b, 0x32, 0x1a,
-	0xe8, 0xab, 0x54, 0xc2, 0x44, 0xa4, 0x34, 0x5f, 0x6c, 0xc7, 0x7d, 0x34, 0xd0, 0x5b, 0x52, 0x06,
-	0x45, 0xa4, 0xa8, 0xc8, 0xf7, 0x91, 0x54, 0x9d, 0x25, 0x4c, 0x44, 0x8a, 0x8a, 0xdc, 0x88, 0xa4,
-	0x4f, 0x53, 0x09, 0xec, 0x3f, 0x03, 0x47, 0xec, 0x17, 0xfa, 0x1f, 0x1c, 0xf1, 0xbf, 0x81, 0xb6,
-	0xbb, 0x63, 0xfc, 0x89, 0xb4, 0x7f, 0x37, 0x16, 0x51, 0x5c, 0xfe, 0xe0, 0xe8, 0xae, 0x26, 0x91,
-	0xa7, 0x3f, 0x02, 0x00, 0x00, 0xff, 0xff, 0x75, 0xe0, 0xda, 0xd0, 0x84, 0x06, 0x00, 0x00,
-}
diff --git a/syncer/proto/syncer.proto b/syncer/proto/syncer.proto
index ea55e55..66a563f 100644
--- a/syncer/proto/syncer.proto
+++ b/syncer/proto/syncer.proto
@@ -6,10 +6,12 @@ message PullRequest {
     string serviceName = 1;
     string options = 2;
     string time = 3;
+    string addr = 4;
 }
 
 message IncrementPullRequest {
     string addr = 1;
+    int64 length = 2;
 }
 
 message DeclareRequest {
diff --git a/syncer/server/handler.go b/syncer/server/handler.go
index 6d0e744..0716db4 100644
--- a/syncer/server/handler.go
+++ b/syncer/server/handler.go
@@ -21,10 +21,9 @@ import (
 	"context"
 	"crypto/tls"
 	"fmt"
-	"math"
 	"strconv"
-	"time"
 
+	"github.com/apache/servicecomb-service-center/pkg/dump"
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/apache/servicecomb-service-center/pkg/tlsutil"
 	"github.com/apache/servicecomb-service-center/pkg/util"
@@ -37,6 +36,7 @@ const (
 	EventDiscovered       = "discovered"
 	EventIncrementPulled  = "incrementPulled"
 	EventNotifyFullPulled = "notifyFullPulled"
+	ChannelBufferSize     = 1000
 )
 
 // tickHandler Timed task handler
@@ -70,69 +70,16 @@ func (s *Server) tickHandler() {
 	}
 }
 
-func (s *Server) DataRemoveTickHandler() chan bool {
-	ticker := time.NewTicker(time.Second * 30)
-	stopChan := make(chan bool)
-	go func(trick *time.Ticker) {
-		//defer ticker.Stop()
-		for {
-			select {
-			case <-ticker.C:
-				s.eventQueueDataRemoveTickHandler()
-				log.Info(fmt.Sprintf("size of records map = %d, size of events slice = %d", len(s.revisionMap), len(s.eventQueue)))
-			case stop := <-stopChan:
-				if stop {
-					log.Info("data remove ticker stop")
-					return
-				}
-			case <-context.Background().Done():
-				return
-			}
-		}
-	}(ticker)
-	return stopChan
-}
-
-func (s *Server) eventQueueDataRemoveTickHandler() {
-	if len(s.revisionMap) == 0 || len(s.eventQueue) == 0 {
-		log.Info("RevisionMap or EventQueue is empty")
-		return
-	}
-
-	log.Info(fmt.Sprintf("length of map : %d", len(s.revisionMap)))
-	var minRevision int64 = math.MaxInt64
-	for _, value := range s.revisionMap {
-		var tempRevision = value.revision
-		if tempRevision < minRevision {
-			minRevision = tempRevision
-		}
-	}
-
-	log.Info(fmt.Sprintf("revision of item will remove : %d", minRevision))
-
-	j := 0
-	for _, value := range s.eventQueue {
-		var tempRevision = value.Revision
-		if tempRevision == minRevision {
-			break
-		}
-		j++
-	}
-	s.eventQueue = s.eventQueue[j:]
-	log.Info(fmt.Sprintf("size of event queue : %d", len(s.eventQueue)))
-	if len(s.eventQueue) > 0 {
-		log.Info(fmt.Sprintf("revision of first element in event queue : %d", s.eventQueue[0].Revision))
-	}
-}
-
 // Pull returns sync data of servicecenter
-func (s *Server) Pull(context.Context, *pb.PullRequest) (*pb.SyncData, error) {
+func (s *Server) Pull(ctx context.Context, req *pb.PullRequest) (*pb.SyncData, error) {
+	if _, ok := s.channelMap[req.GetAddr()]; !ok {
+		s.channelMap[req.GetAddr()] = make(chan *dump.WatchInstanceChangedEvent, ChannelBufferSize)
+	}
 	return s.servicecenter.Discovery(), nil
 }
 
 func (s *Server) IncrementPull(ctx context.Context, req *pb.IncrementPullRequest) (*pb.SyncData, error) {
-	incrementQueue := s.GetIncrementQueue(req.GetAddr())
-	s.updateRevisionMap(req.GetAddr(), incrementQueue)
+	incrementQueue := s.GetIncrementQueue(req)
 	return s.GetIncrementData(ctx, incrementQueue), nil
 }
 
@@ -179,7 +126,7 @@ func (s *Server) userEvent(data ...[]byte) (success bool) {
 	}
 
 	cli := client.NewSyncClient(endpoint, tlsConfig)
-	syncData, err := cli.Pull(context.Background())
+	syncData, err := cli.Pull(context.Background(), s.conf.Listener.RPCAddr)
 	if err != nil {
 		log.Errorf(err, "Pull other serf instances failed, node name is '%s'", members[0].Name)
 		return
@@ -234,7 +181,8 @@ func (s *Server) incrementUserEvent(data ...[]byte) (success bool) {
 	syncDataLength := declareResponse.SyncDataLength
 
 	if syncDataLength != 0 {
-		syncData, err := cli.IncrementPull(context.Background(), s.conf.Listener.RPCAddr)
+		syncData, err := cli.IncrementPull(
+			context.Background(), &pb.IncrementPullRequest{Addr: s.conf.Listener.RPCAddr, Length: syncDataLength})
 		if err != nil {
 			log.Error(fmt.Sprintf("IncrementPull other serf instances failed, node name is '%s'", members[0].Name), err)
 			return
diff --git a/syncer/server/handler_test.go b/syncer/server/handler_test.go
index 72e7d24..fd0ee12 100644
--- a/syncer/server/handler_test.go
+++ b/syncer/server/handler_test.go
@@ -3,12 +3,8 @@ package server
 import (
 	"context"
 	"errors"
-	"fmt"
-	"log"
 	"strconv"
-	"sync"
 	"testing"
-	"time"
 
 	"github.com/apache/servicecomb-service-center/pkg/dump"
 	"github.com/apache/servicecomb-service-center/syncer/config"
@@ -20,57 +16,14 @@ import (
 
 var s Server
 
-func TestServer_DataRemoveTickHandler(t *testing.T) {
-	log.Print("start")
-	var actions = []string{"CREATE", "UPDATE", "DELETE"}
-	var s Server
-
-	var recoders = make(map[string]record, 9)
-	var events = make([]*dump.WatchInstanceChangedEvent, 10)
-	s.revisionMap = recoders
-	s.eventQueue = events
-
-	for i := 2; i < 10; i++ {
-		var recoder = new(record)
-		recoder.revision = int64(i)
-		s.revisionMap[strconv.FormatInt(int64(i), 10)] = *recoder
-	}
-	log.Printf("size of records map = %d", len(s.revisionMap))
-
-	for i := 0; i < 10; i++ {
-		var event = new(dump.WatchInstanceChangedEvent)
-		event.Revision = int64(i)
-
-		event.Action = actions[i%3]
-
-		s.eventQueue[i] = event
-	}
-
-	removeMapElement(s.revisionMap, 2)
-	t.Run("start update queue stop after 15 second", func(t *testing.T) {
-		ch := s.DataRemoveTickHandler()
-		time.Sleep(15 * time.Second)
-		ch <- true
-		close(ch)
-	})
-
-}
-
 func TestServer_IncrementPull(t *testing.T) {
 	confCreate()
-	s.revisionMap, s.eventQueue = getMapAndQueue(9, 10)
-	t.Run("increment when address is new", func(t *testing.T) {
-		iPReq := pb.IncrementPullRequest{
-			Addr: "171.0.0.1",
-		}
-		syncData, err := s.IncrementPull(context.Background(), &iPReq)
-		assert.NoError(t, err, "no error when DeclareDataLength")
-		assert.NotEmpty(t, syncData, "increase succeed")
-	})
+	s.channelMap = setChannel()
 
 	t.Run("increment when address exist", func(t *testing.T) {
 		iPReq := pb.IncrementPullRequest{
-			Addr: "1",
+			Addr:   "1",
+			Length: 1,
 		}
 		syncData, err := s.IncrementPull(context.Background(), &iPReq)
 		assert.NoError(t, err, "no error when DeclareDataLength")
@@ -79,17 +32,7 @@ func TestServer_IncrementPull(t *testing.T) {
 }
 
 func TestServer_DeclareDataLength(t *testing.T) {
-	s.revisionMap, s.eventQueue = getMapAndQueue(9, 10)
-	t.Run("run with new address", func(t *testing.T) {
-		dReq := pb.DeclareRequest{
-			Addr: "http://127.0.0.1",
-		}
-		declareResp, err := s.DeclareDataLength(context.Background(), &dReq)
-
-		assert.NoError(t, err, "error when DeclareDataLength")
-		assert.Empty(t, err, "declareResp.SyncDataLength is empty")
-		assert.NotZero(t, declareResp.SyncDataLength, "declareResp.SyncDataLength is empty")
-	})
+	s.channelMap = setChannel()
 	t.Run("when address exist", func(t *testing.T) {
 		dReq := pb.DeclareRequest{
 			Addr: "3",
@@ -118,33 +61,24 @@ func TestService_incrementUserEvent(t *testing.T) {
 	})
 }
 
-func getMapAndQueue(mapSize int, queueSize int) (map[string]record, []*dump.WatchInstanceChangedEvent) {
-	var actions = []string{"CREATE", "UPDATE", "DELETE"}
-
-	var recoders = make(map[string]record, mapSize)
-	var events = make([]*dump.WatchInstanceChangedEvent, queueSize)
-
-	for i := 2; i < queueSize; i++ {
-		var recoder = new(record)
-		recoder.revision = int64(i)
-
-		recoder.action = actions[i%3]
-
-		recoders[strconv.FormatInt(int64(i), 10)] = *recoder
-	}
-	log.Printf("size of records map = %d", len(s.revisionMap))
-
-	for i := 0; i < queueSize; i++ {
-		var event = new(dump.WatchInstanceChangedEvent)
-		var event1 = instanceAndServiceCreate(i)
-		event.Revision = int64(i)
-		event.Action = actions[i%3]
-		event.Service = event1.Service
-		event.Instance = event1.Instance
-
-		events[i] = event
+func setChannel() map[string]chan *dump.WatchInstanceChangedEvent {
+	var channelMap = make(map[string]chan *dump.WatchInstanceChangedEvent)
+	var ch1 = make(chan *dump.WatchInstanceChangedEvent, 1000)
+	var ch2 = make(chan *dump.WatchInstanceChangedEvent, 1000)
+	var ch3 = make(chan *dump.WatchInstanceChangedEvent, 1000)
+	channelMap["1"] = ch1
+	channelMap["2"] = ch2
+	channelMap["3"] = ch3
+
+	var event1 = instanceAndServiceCreate(1)
+
+	for _, ch := range channelMap {
+		select {
+		case ch <- event1:
+		default:
+		}
 	}
-	return recoders, events
+	return channelMap
 }
 
 func confCreate() {
@@ -262,7 +196,6 @@ func instanceAndServiceCreate(i int) *dump.WatchInstanceChangedEvent {
 	is.Value = iv
 	is.Rev = int64(i)
 
-	event.Revision = int64(i)
 	event.Instance = is
 	event.Service = ss
 
@@ -277,20 +210,3 @@ func defaultServer() *serf.Server {
 		serf.WithBindPort(35151),
 	)
 }
-
-func removeMapElement(events map[string]record, start int64) {
-	var mutex sync.Mutex
-	testTicker := time.NewTicker(time.Second * 1)
-	go func(tick *time.Ticker) {
-		for k := start; k < 10; k++ {
-			<-testTicker.C
-			mutex.Lock()
-			delete(events, strconv.FormatInt(int64(k), 10))
-			mutex.Unlock()
-			fmt.Println("remove element in map")
-		}
-	}(testTicker)
-	if len(events) == 0 {
-		defer testTicker.Stop()
-	}
-}
diff --git a/syncer/server/server.go b/syncer/server/server.go
index 8d5ccbd..7212dee 100644
--- a/syncer/server/server.go
+++ b/syncer/server/server.go
@@ -20,7 +20,6 @@ package server
 import (
 	"context"
 	"errors"
-	"fmt"
 	"strconv"
 	"sync"
 	"syscall"
@@ -86,38 +85,26 @@ type Server struct {
 	// Wraps the grpc server
 	grpc *grpc.Server
 
-	revisionMap map[string]record
-
-	eventQueue []*dump.WatchInstanceChangedEvent
-
-	mapLock sync.RWMutex
-
-	queueLock sync.RWMutex
-
 	mux sync.RWMutex
 
 	triggered bool
 
+	channelMap map[string]chan *dump.WatchInstanceChangedEvent
+
 	// The channel will be closed when receiving a system interrupt signal
 	stopCh chan struct{}
 }
 
-type record struct {
-	revision int64
-	action   string
-}
-
 // NewServer new server with Config
 func NewServer(conf *config.Config) *Server {
 	ctx, cancel := context.WithCancel(context.Background())
 	return &Server{
-		ctx:         ctx,
-		cancel:      cancel,
-		conf:        conf,
-		stopCh:      make(chan struct{}),
-		revisionMap: make(map[string]record),
-		eventQueue:  make([]*dump.WatchInstanceChangedEvent, 0),
-		triggered:   true,
+		ctx:        ctx,
+		cancel:     cancel,
+		conf:       conf,
+		stopCh:     make(chan struct{}),
+		triggered:  true,
+		channelMap: make(map[string]chan *dump.WatchInstanceChangedEvent),
 	}
 }
 
@@ -151,8 +138,6 @@ func (s *Server) Run(ctx context.Context) {
 
 	s.task.Handle(s.tickHandler)
 
-	s.DataRemoveTickHandler()
-
 	s.task.Run(ctx)
 
 	go s.NewHTTPServer()
@@ -302,6 +287,13 @@ func (s *Server) watchInstance() error {
 	return nil
 }
 
+func instFromOtherSC(instance *dump.Instance, m *pb.MappingEntry) bool {
+	if instance.Value.InstanceId == m.CurInstanceID && m.OrgInstanceID != "" {
+		return true
+	}
+	return false
+}
+
 func (s *Server) addToQueue(event *dump.WatchInstanceChangedEvent) {
 	mapping := s.servicecenter.GetSyncMapping()
 
@@ -313,96 +305,52 @@ func (s *Server) addToQueue(event *dump.WatchInstanceChangedEvent) {
 		}
 	}
 
-	s.queueLock.Lock()
-	s.eventQueue = append(s.eventQueue, event)
-	log.Debugf("success add instance event to queue:%s   len:%s", event, len(s.eventQueue))
-	s.queueLock.Unlock()
-}
-
-func instFromOtherSC(instance *dump.Instance, m *pb.MappingEntry) bool {
-	if instance.Value.InstanceId == m.CurInstanceID && m.OrgInstanceID != "" {
-		return true
-	}
-	return false
-}
-
-func (s *Server) getRevision(addr string) int64 {
-	s.mapLock.RLock()
-	value, ok := s.revisionMap[addr]
-	s.mapLock.RUnlock()
-	if ok {
-		return value.revision
+	for _, ch := range s.channelMap {
+		select {
+		case ch <- event:
+			log.Info("add event to queue")
+		default:
+			log.Info("channel buffer is full")
+		}
 	}
-	return -1
 }
 
-func (s *Server) getAction(addr string) string {
-	s.mapLock.RLock()
-	value, ok := s.revisionMap[addr]
-	s.mapLock.RUnlock()
+func (s *Server) getSyncDataLength(addr string) (response *pb.DeclareResponse) {
+	ch, ok := s.channelMap[addr]
+	var length int64
 	if ok {
-		return value.action
+		length = int64(len(ch))
+	} else {
+		length = 0
+		log.Error("fail to  find the specific channel according to the addr", utils.ErrChannelSearch)
 	}
-	return ""
-}
-
-func (s *Server) getSyncDataLength(addr string) (response *pb.DeclareResponse) {
 	response = &pb.DeclareResponse{
-		SyncDataLength: int64(len(s.GetIncrementQueue(addr))),
+		SyncDataLength: length,
 	}
 	return response
 }
 
-func (s *Server) updateRevisionMap(addr string, incrementQueue []*dump.WatchInstanceChangedEvent) {
-	if len(incrementQueue) == 0 {
-		log.Info("incrementQueue is empty, no need to update RevisionMap")
-		return
-	}
-
-	log.Debug(fmt.Sprintf("update RevisionMap, addr = %s", addr))
-	s.mapLock.Lock()
-	s.revisionMap[addr] = record{
-		incrementQueue[len(incrementQueue)-1].Revision,
-		incrementQueue[len(incrementQueue)-1].Action,
-	}
-	s.mapLock.Unlock()
-}
-
-func (s *Server) GetIncrementQueue(addr string) []*dump.WatchInstanceChangedEvent {
-	revision := s.getRevision(addr)
-	action := s.getAction(addr)
-
-	s.queueLock.RLock()
-	defer s.queueLock.RUnlock()
-
-	length := len(s.eventQueue)
-	if length == 0 {
-		log.Info("eventQueue is empty")
+func (s *Server) GetIncrementQueue(req *pb.IncrementPullRequest) (queue []*dump.WatchInstanceChangedEvent) {
+	ch, ok := s.channelMap[req.GetAddr()]
+	if !ok {
+		log.Debug("fail to find the queue according to the addr")
 		return nil
 	}
 
-	if revision == -1 {
-		return s.eventQueue
-	}
-
-	index := 0
-	for _, event := range s.eventQueue {
-		if event.Revision == revision && event.Action == action {
-			break
+	queue = make([]*dump.WatchInstanceChangedEvent, 0, len(ch))
+	for i := 0; i < int(req.GetLength()); i++ {
+		select {
+		case temp, ok := <-ch:
+			if !ok {
+				log.Debug("channel closed")
+				return
+			}
+			queue = append(queue, temp)
+		default:
+			return
 		}
-		index++
-	}
-
-	if index == length-1 {
-		log.Info("no incremental event in the queue")
-		return nil
-	}
-
-	if index == length {
-		log.Info(fmt.Sprintf("fail to find the event in the queue by RevisionMap, Revision = %d, Action = %s", revision, action))
-		return s.eventQueue
 	}
-	return s.eventQueue[index+1:]
+	return
 }
 
 func (s *Server) GetIncrementData(ctx context.Context, incrementQueue []*dump.WatchInstanceChangedEvent) (data *pb.SyncData) {