You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by GitBox <gi...@apache.org> on 2020/11/24 07:16:02 UTC

[GitHub] [servicecomb-service-center] lilai23 opened a new pull request #757: Incremental synchronization of sc: sync between sc syncers

lilai23 opened a new pull request #757:
URL: https://github.com/apache/servicecomb-service-center/pull/757


   本次提交主要是sc增量同步特性中,各个region的syncer之间的数据增量同步的部分,具体实现如下:
   
   1. 新增incrementPulled类型的user event,收到该user event的其他syncer会进行建立grpc连接,预声明增量同步数据长度和传输增量数据的“握手协议”。
   2. 预声明增量同步数据长度方法过程会从syncer中的增量同步队列eventQueue中根据revisionMap得到增量数据长度并返回。
   3. 增量同步方法根据将eventQueue转化为syncData并返回,并更新revisionMap
   4. 同步方syncer收到syncData后根据instance中的action字段进行相应的注册和反注册等操作,完成增量数据同步。
   5. 实际增量同步数据长度与预声明长度不符时,提供http接口触发可使所有syncer两两之间完成一次全量同步。(也可适用于新加入syncer节点的场景)
   6. 定时任务清除eventQueue中最小revision之前的所有已被其他syncer同步的增量数据。


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] tianxiaoliang commented on a change in pull request #757: Incremental synchronization of sc: sync between sc syncers

Posted by GitBox <gi...@apache.org>.
tianxiaoliang commented on a change in pull request #757:
URL: https://github.com/apache/servicecomb-service-center/pull/757#discussion_r531820693



##########
File path: syncer/client/sync_client_test.go
##########
@@ -0,0 +1,17 @@
+package client
+
+import (
+	"context"
+	"crypto/tls"
+	"testing"
+)
+
+var c = NewSyncClient("", new(tls.Config))
+
+func TestClient_IncrementPull(t *testing.T) {
+	c.IncrementPull(context.Background(), "http://127.0.0.1")

Review comment:
       并未有效的进行断言,没有任何ut意义,思考下如何断言,只是测试编译能不能过,没有意义




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] lilai23 commented on a change in pull request #757: Incremental synchronization of sc: sync between sc syncers

Posted by GitBox <gi...@apache.org>.
lilai23 commented on a change in pull request #757:
URL: https://github.com/apache/servicecomb-service-center/pull/757#discussion_r532317820



##########
File path: syncer/server/server.go
##########
@@ -85,22 +86,38 @@ type Server struct {
 	// Wraps the grpc server
 	grpc *grpc.Server
 
+	revisionMap map[string]record
+
 	eventQueue []*dump.WatchInstanceChangedEvent
 
+	mapLock sync.RWMutex
+
 	queueLock sync.RWMutex
+

Review comment:
       syncer里面包含了serf、etcd等服务,syncer这个server用于接收sc中的数据进行同步,在这个PR之前server里面已经实现了全量同步的逻辑,这个PR是在其中新增了增量同步的功能并作为主要的同步机制,另外新增httpserver是用于运维人员在增量同步发生异常告警时手动触发一次全量同步




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] tianxiaoliang commented on a change in pull request #757: Incremental synchronization of sc: sync between sc syncers

Posted by GitBox <gi...@apache.org>.
tianxiaoliang commented on a change in pull request #757:
URL: https://github.com/apache/servicecomb-service-center/pull/757#discussion_r531822099



##########
File path: syncer/servicecenter/servicecenter.go
##########
@@ -118,3 +120,89 @@ func (s *servicecenter) Registry(clusterName string, data *pb.SyncData) {
 func (s *servicecenter) Discovery() *pb.SyncData {
 	return s.storage.GetData()
 }
+
+func (s *servicecenter) IncrementRegistry(clusterName string, data *pb.SyncData) {
+	mapping := s.storage.GetMapByCluster(clusterName)
+	for _, inst := range data.Instances {
+		svc := searchService(inst, data.Services)
+		if svc == nil {
+			err := errors.New("service does not exist")
+			log.Errorf(err, "servicecenter.Registry, serviceID = %s, instanceId = %s", inst.ServiceId, inst.InstanceId)
+			continue
+		}
+
+		// If the svc is in the mapping, just do nothing, if not, created it in servicecenter and get the new serviceID
+		svcID := s.createService(svc)
+		log.Debugf("create service success orgServiceID= %s, curServiceID = %s", inst.ServiceId, svcID)

Review comment:
       Debugf 不要用以f为结尾的日志工具,现在已经禁用了 里面用fmt.代替

##########
File path: syncer/servicecenter/servicecenter.go
##########
@@ -118,3 +120,89 @@ func (s *servicecenter) Registry(clusterName string, data *pb.SyncData) {
 func (s *servicecenter) Discovery() *pb.SyncData {
 	return s.storage.GetData()
 }
+
+func (s *servicecenter) IncrementRegistry(clusterName string, data *pb.SyncData) {
+	mapping := s.storage.GetMapByCluster(clusterName)
+	for _, inst := range data.Instances {
+		svc := searchService(inst, data.Services)
+		if svc == nil {
+			err := errors.New("service does not exist")
+			log.Errorf(err, "servicecenter.Registry, serviceID = %s, instanceId = %s", inst.ServiceId, inst.InstanceId)
+			continue
+		}
+
+		// If the svc is in the mapping, just do nothing, if not, created it in servicecenter and get the new serviceID
+		svcID := s.createService(svc)
+		log.Debugf("create service success orgServiceID= %s, curServiceID = %s", inst.ServiceId, svcID)
+
+		matches := pb.Expansions(inst.Expansions).Find("action", map[string]string{})
+		if len(matches) != 1 {
+			err := errors.New("action invalid")
+			log.Errorf(err, "can not handle invalid action")
+			continue
+		}
+		action := string(matches[0].Bytes[:])
+
+		if action == string(discovery.EVT_CREATE) {
+			log.Debugf("trying to do registration of instance, instanceID = %s", inst.InstanceId)
+
+			// If inst is in the mapping, just heart beat it in servicecenter
+			if s.heartbeatInstances(mapping, inst) {
+				continue
+			}
+
+			item := &pb.MappingEntry{
+				ClusterName:   clusterName,
+				DomainProject: svc.DomainProject,
+				OrgServiceID:  svc.ServiceId,
+				OrgInstanceID: inst.InstanceId,
+				CurServiceID:  svcID,
+				CurInstanceID: s.registryInstances(svc.DomainProject, svcID, inst),
+			}
+
+			// Use new serviceID and instanceID to update mapping data in this servicecenter
+			if item.CurInstanceID != "" {
+				mapping = append(mapping, item)
+			}
+		}
+
+		if action == string(discovery.EVT_DELETE) {
+			log.Debugf("trying to do unRegistration of instance, instanceID = %s", inst.InstanceId)
+			if len(mapping) == 0 {
+				err := errors.New("mapping does not exist")
+				log.Errorf(err, "fail to handle unregister")
+				return
+			}
+
+			index := 0
+			ctx := context.Background()
+
+			for _, val := range mapping {
+				if val.OrgInstanceID == inst.InstanceId {
+					err := s.servicecenter.UnregisterInstance(ctx, val.DomainProject, val.CurServiceID, val.CurInstanceID)
+					if err != nil {
+						log.Errorf(err, "Servicecenter delete instance failed")
+					}
+					log.Debugf("Unregistered instance, InstanceID = %s", val.CurInstanceID)
+					break
+				}
+				index++
+			}
+
+			switch {
+			case len(mapping) == 1:
+				mapping = nil
+			case index == 0:
+				mapping = mapping[index+1:]
+			case index == len(mapping)-1:
+				mapping = mapping[:index]
+			case index == len(mapping):
+				err := errors.New("the instance is not in the mapping")

Review comment:
       抽成全局 var ErrXXXX




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] lilai23 commented on a change in pull request #757: Incremental synchronization of sc: sync between sc syncers

Posted by GitBox <gi...@apache.org>.
lilai23 commented on a change in pull request #757:
URL: https://github.com/apache/servicecomb-service-center/pull/757#discussion_r532306737



##########
File path: syncer/server/http.go
##########
@@ -0,0 +1,44 @@
+package server
+
+import (
+	"github.com/apache/servicecomb-service-center/pkg/log"
+	"github.com/go-chassis/go-chassis/v2"
+	rf "github.com/go-chassis/go-chassis/v2/server/restful"
+	"github.com/go-chassis/openlog"
+	"net/http"
+)
+
+const (
+	Message = "Deliver full synchronization task success!"
+)
+
+func (s *Server) FullSync(b *rf.Context) {
+	s.mux.Lock()
+	s.triggered = true

Review comment:
       定时任务tickhandle会检测这个triggered 如果是true会触发一次全量同步 然后置为false,可能会有竞争




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] lilai23 commented on a change in pull request #757: Incremental synchronization of sc: sync between sc syncers

Posted by GitBox <gi...@apache.org>.
lilai23 commented on a change in pull request #757:
URL: https://github.com/apache/servicecomb-service-center/pull/757#discussion_r532042947



##########
File path: syncer/server/handler.go
##########
@@ -45,9 +50,78 @@ func (s *Server) tickHandler() {
 	s.servicecenter.FlushData()
 
 	// sends a UserEvent on Serf, the event will be broadcast between members
-	err := s.serf.UserEvent(EventDiscovered, util.StringToBytesWithNoCopy(s.conf.Cluster))
-	if err != nil {
-		log.Errorf(err, "Syncer send user event failed")
+	s.httpserver.Mux.Lock()
+	defer s.httpserver.Mux.Unlock()
+	if s.httpserver.Triggered {
+		err := s.serf.UserEvent(EventNotifyFullPulled, util.StringToBytesWithNoCopy(""))
+		s.httpserver.Triggered = false
+		if err != nil {
+			log.Errorf(err, "Syncer send notifyFullPulled user event failed")
+		}
+		err = alarm.Clear(alarm.IDIncrementPullError)
+		if err != nil {
+			log.Error("", err)
+		}
+	} else {
+		err := s.serf.UserEvent(EventIncrementPulled, util.StringToBytesWithNoCopy(s.conf.Cluster))
+		if err != nil {
+			log.Errorf(err, "Syncer send incrementPulled user event failed")
+		}
+	}
+}
+
+func (s *Server) DataRemoveTickHandler() chan bool {
+	ticker := time.NewTicker(time.Second * 30)
+	stopChan := make(chan bool)
+	go func(trick *time.Ticker) {
+		//defer ticker.Stop()
+		for {
+			select {
+			case <-ticker.C:
+				s.eventQueueDataRemoveTickHandler()
+				log.Infof("size of records map = %d, size of events slice = %d", len(s.revisionMap), len(s.eventQueue))
+			case stop := <-stopChan:
+				if stop {
+					fmt.Print("ready stop")

Review comment:
       已修改




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] lilai23 commented on a change in pull request #757: Incremental synchronization of sc: sync between sc syncers

Posted by GitBox <gi...@apache.org>.
lilai23 commented on a change in pull request #757:
URL: https://github.com/apache/servicecomb-service-center/pull/757#discussion_r532311791



##########
File path: cmd/syncer/conf/microservice.yaml
##########
@@ -0,0 +1,3 @@
+servicecomb:
+  service:
+    name: FullSyncronization

Review comment:
       已修改

##########
File path: syncer/server/handler.go
##########
@@ -104,3 +188,73 @@ func (s *Server) userEvent(data ...[]byte) (success bool) {
 	s.servicecenter.Registry(clusterName, syncData)
 	return true
 }
+
+func (s *Server) incrementUserEvent(data ...[]byte) (success bool) {
+	log.Debug("Receive serf user event")
+	clusterName := util.BytesToStringWithNoCopy(data[0])
+
+	// Excludes notifications from self, as the gossip protocol inevitably has redundant notifications
+	if s.conf.Cluster == clusterName {
+		return
+	}
+
+	tags := map[string]string{tagKeyClusterName: clusterName}
+	// Get member information and get synchronized data from it
+	members := s.serf.MembersByTags(tags)
+	if len(members) == 0 {
+		log.Warnf("serf member = %s is not found", clusterName)

Review comment:
       抱歉,忽略查找修改warnf了,已修改

##########
File path: syncer/server/http.go
##########
@@ -0,0 +1,44 @@
+package server
+
+import (
+	"github.com/apache/servicecomb-service-center/pkg/log"
+	"github.com/go-chassis/go-chassis/v2"
+	rf "github.com/go-chassis/go-chassis/v2/server/restful"
+	"github.com/go-chassis/openlog"
+	"net/http"
+)
+
+const (
+	Message = "Deliver full synchronization task success!"
+)
+
+func (s *Server) FullSync(b *rf.Context) {
+	s.mux.Lock()
+	s.triggered = true
+	s.mux.Unlock()
+	err := b.Write([]byte(Message))
+	if err != nil {
+		log.Error("", err)
+	}
+	return
+}
+
+func (s *Server) URLPatterns() []rf.Route {
+	return []rf.Route{
+		{Method: http.MethodGet, Path: "/v1/syncer/full_sync", ResourceFunc: s.FullSync},

Review comment:
       已修改/v1/syncer/full-synchronization

##########
File path: syncer/server/http.go
##########
@@ -0,0 +1,44 @@
+package server
+
+import (
+	"github.com/apache/servicecomb-service-center/pkg/log"
+	"github.com/go-chassis/go-chassis/v2"
+	rf "github.com/go-chassis/go-chassis/v2/server/restful"
+	"github.com/go-chassis/openlog"
+	"net/http"
+)
+
+const (
+	Message = "Deliver full synchronization task success!"
+)
+
+func (s *Server) FullSync(b *rf.Context) {
+	s.mux.Lock()
+	s.triggered = true
+	s.mux.Unlock()
+	err := b.Write([]byte(Message))
+	if err != nil {
+		log.Error("", err)
+	}
+	return
+}
+
+func (s *Server) URLPatterns() []rf.Route {
+	return []rf.Route{
+		{Method: http.MethodGet, Path: "/v1/syncer/full_sync", ResourceFunc: s.FullSync},
+	}
+}
+
+//if you use go run main.go instead of binary run, plz export CHASSIS_HOME=/{path}/{to}/server/
+
+func (s *Server) NewHttpServer() {
+	chassis.RegisterSchema("rest", s)
+	if err := chassis.Init(); err != nil {
+		openlog.Fatal("Init failed." + err.Error())

Review comment:
       已修改




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] tianxiaoliang commented on a change in pull request #757: Incremental synchronization of sc: sync between sc syncers

Posted by GitBox <gi...@apache.org>.
tianxiaoliang commented on a change in pull request #757:
URL: https://github.com/apache/servicecomb-service-center/pull/757#discussion_r532305088



##########
File path: cmd/syncer/conf/microservice.yaml
##########
@@ -0,0 +1,3 @@
+servicecomb:
+  service:
+    name: FullSyncronization

Review comment:
       就叫Syncer是否可以

##########
File path: syncer/server/handler.go
##########
@@ -104,3 +188,73 @@ func (s *Server) userEvent(data ...[]byte) (success bool) {
 	s.servicecenter.Registry(clusterName, syncData)
 	return true
 }
+
+func (s *Server) incrementUserEvent(data ...[]byte) (success bool) {
+	log.Debug("Receive serf user event")
+	clusterName := util.BytesToStringWithNoCopy(data[0])
+
+	// Excludes notifications from self, as the gossip protocol inevitably has redundant notifications
+	if s.conf.Cluster == clusterName {
+		return
+	}
+
+	tags := map[string]string{tagKeyClusterName: clusterName}
+	// Get member information and get synchronized data from it
+	members := s.serf.MembersByTags(tags)
+	if len(members) == 0 {
+		log.Warnf("serf member = %s is not found", clusterName)

Review comment:
       这里带f结尾了

##########
File path: syncer/server/http.go
##########
@@ -0,0 +1,44 @@
+package server
+
+import (
+	"github.com/apache/servicecomb-service-center/pkg/log"
+	"github.com/go-chassis/go-chassis/v2"
+	rf "github.com/go-chassis/go-chassis/v2/server/restful"
+	"github.com/go-chassis/openlog"
+	"net/http"
+)
+
+const (
+	Message = "Deliver full synchronization task success!"
+)
+
+func (s *Server) FullSync(b *rf.Context) {
+	s.mux.Lock()
+	s.triggered = true

Review comment:
       加锁意义是?

##########
File path: syncer/server/http.go
##########
@@ -0,0 +1,44 @@
+package server
+
+import (
+	"github.com/apache/servicecomb-service-center/pkg/log"
+	"github.com/go-chassis/go-chassis/v2"
+	rf "github.com/go-chassis/go-chassis/v2/server/restful"
+	"github.com/go-chassis/openlog"
+	"net/http"
+)
+
+const (
+	Message = "Deliver full synchronization task success!"
+)
+
+func (s *Server) FullSync(b *rf.Context) {
+	s.mux.Lock()
+	s.triggered = true
+	s.mux.Unlock()
+	err := b.Write([]byte(Message))
+	if err != nil {
+		log.Error("", err)
+	}
+	return
+}
+
+func (s *Server) URLPatterns() []rf.Route {
+	return []rf.Route{
+		{Method: http.MethodGet, Path: "/v1/syncer/full_sync", ResourceFunc: s.FullSync},
+	}
+}
+
+//if you use go run main.go instead of binary run, plz export CHASSIS_HOME=/{path}/{to}/server/
+
+func (s *Server) NewHttpServer() {
+	chassis.RegisterSchema("rest", s)
+	if err := chassis.Init(); err != nil {
+		openlog.Fatal("Init failed." + err.Error())

Review comment:
       日志工具不要混用,一个项目只能用一个工具

##########
File path: syncer/server/http.go
##########
@@ -0,0 +1,44 @@
+package server
+
+import (
+	"github.com/apache/servicecomb-service-center/pkg/log"
+	"github.com/go-chassis/go-chassis/v2"
+	rf "github.com/go-chassis/go-chassis/v2/server/restful"
+	"github.com/go-chassis/openlog"
+	"net/http"
+)
+
+const (
+	Message = "Deliver full synchronization task success!"
+)
+
+func (s *Server) FullSync(b *rf.Context) {
+	s.mux.Lock()
+	s.triggered = true
+	s.mux.Unlock()
+	err := b.Write([]byte(Message))
+	if err != nil {
+		log.Error("", err)
+	}
+	return
+}
+
+func (s *Server) URLPatterns() []rf.Route {
+	return []rf.Route{
+		{Method: http.MethodGet, Path: "/v1/syncer/full_sync", ResourceFunc: s.FullSync},

Review comment:
       不可以用under score,要用hyphen,另外这个sync是动词,必须用名词

##########
File path: syncer/server/server.go
##########
@@ -85,22 +86,38 @@ type Server struct {
 	// Wraps the grpc server
 	grpc *grpc.Server
 
+	revisionMap map[string]record
+
 	eventQueue []*dump.WatchInstanceChangedEvent
 
+	mapLock sync.RWMutex
+
 	queueLock sync.RWMutex
+

Review comment:
       这个Server干吗用的?go chassis支持多server管理,不要单独写个http server了




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] tianxiaoliang merged pull request #757: Incremental synchronization of sc: sync between sc syncers

Posted by GitBox <gi...@apache.org>.
tianxiaoliang merged pull request #757:
URL: https://github.com/apache/servicecomb-service-center/pull/757


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] lilai23 commented on a change in pull request #757: Incremental synchronization of sc: sync between sc syncers

Posted by GitBox <gi...@apache.org>.
lilai23 commented on a change in pull request #757:
URL: https://github.com/apache/servicecomb-service-center/pull/757#discussion_r532043045



##########
File path: syncer/servicecenter/servicecenter.go
##########
@@ -118,3 +120,89 @@ func (s *servicecenter) Registry(clusterName string, data *pb.SyncData) {
 func (s *servicecenter) Discovery() *pb.SyncData {
 	return s.storage.GetData()
 }
+
+func (s *servicecenter) IncrementRegistry(clusterName string, data *pb.SyncData) {
+	mapping := s.storage.GetMapByCluster(clusterName)
+	for _, inst := range data.Instances {
+		svc := searchService(inst, data.Services)
+		if svc == nil {
+			err := errors.New("service does not exist")
+			log.Errorf(err, "servicecenter.Registry, serviceID = %s, instanceId = %s", inst.ServiceId, inst.InstanceId)
+			continue
+		}
+
+		// If the svc is in the mapping, just do nothing, if not, created it in servicecenter and get the new serviceID
+		svcID := s.createService(svc)
+		log.Debugf("create service success orgServiceID= %s, curServiceID = %s", inst.ServiceId, svcID)

Review comment:
       已修改

##########
File path: syncer/servicecenter/servicecenter.go
##########
@@ -118,3 +120,89 @@ func (s *servicecenter) Registry(clusterName string, data *pb.SyncData) {
 func (s *servicecenter) Discovery() *pb.SyncData {
 	return s.storage.GetData()
 }
+
+func (s *servicecenter) IncrementRegistry(clusterName string, data *pb.SyncData) {
+	mapping := s.storage.GetMapByCluster(clusterName)
+	for _, inst := range data.Instances {
+		svc := searchService(inst, data.Services)
+		if svc == nil {
+			err := errors.New("service does not exist")
+			log.Errorf(err, "servicecenter.Registry, serviceID = %s, instanceId = %s", inst.ServiceId, inst.InstanceId)
+			continue
+		}
+
+		// If the svc is in the mapping, just do nothing, if not, created it in servicecenter and get the new serviceID
+		svcID := s.createService(svc)
+		log.Debugf("create service success orgServiceID= %s, curServiceID = %s", inst.ServiceId, svcID)
+
+		matches := pb.Expansions(inst.Expansions).Find("action", map[string]string{})
+		if len(matches) != 1 {
+			err := errors.New("action invalid")
+			log.Errorf(err, "can not handle invalid action")
+			continue
+		}
+		action := string(matches[0].Bytes[:])
+
+		if action == string(discovery.EVT_CREATE) {
+			log.Debugf("trying to do registration of instance, instanceID = %s", inst.InstanceId)
+
+			// If inst is in the mapping, just heart beat it in servicecenter
+			if s.heartbeatInstances(mapping, inst) {
+				continue
+			}
+
+			item := &pb.MappingEntry{
+				ClusterName:   clusterName,
+				DomainProject: svc.DomainProject,
+				OrgServiceID:  svc.ServiceId,
+				OrgInstanceID: inst.InstanceId,
+				CurServiceID:  svcID,
+				CurInstanceID: s.registryInstances(svc.DomainProject, svcID, inst),
+			}
+
+			// Use new serviceID and instanceID to update mapping data in this servicecenter
+			if item.CurInstanceID != "" {
+				mapping = append(mapping, item)
+			}
+		}
+
+		if action == string(discovery.EVT_DELETE) {
+			log.Debugf("trying to do unRegistration of instance, instanceID = %s", inst.InstanceId)
+			if len(mapping) == 0 {
+				err := errors.New("mapping does not exist")
+				log.Errorf(err, "fail to handle unregister")
+				return
+			}
+
+			index := 0
+			ctx := context.Background()
+
+			for _, val := range mapping {
+				if val.OrgInstanceID == inst.InstanceId {
+					err := s.servicecenter.UnregisterInstance(ctx, val.DomainProject, val.CurServiceID, val.CurInstanceID)
+					if err != nil {
+						log.Errorf(err, "Servicecenter delete instance failed")
+					}
+					log.Debugf("Unregistered instance, InstanceID = %s", val.CurInstanceID)
+					break
+				}
+				index++
+			}
+
+			switch {
+			case len(mapping) == 1:
+				mapping = nil
+			case index == 0:
+				mapping = mapping[index+1:]
+			case index == len(mapping)-1:
+				mapping = mapping[:index]
+			case index == len(mapping):
+				err := errors.New("the instance is not in the mapping")

Review comment:
       已修改




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] lilai23 commented on a change in pull request #757: Incremental synchronization of sc: sync between sc syncers

Posted by GitBox <gi...@apache.org>.
lilai23 commented on a change in pull request #757:
URL: https://github.com/apache/servicecomb-service-center/pull/757#discussion_r532043003



##########
File path: etc/conf/syncer.yaml
##########
@@ -36,6 +36,11 @@ registry:
   tlsMount:
     enabled: false
     name: servicecenter
+httpConfig:

Review comment:
       httpserver已改造

##########
File path: etc/conf/syncer.yaml
##########
@@ -36,6 +36,11 @@ registry:
   tlsMount:
     enabled: false
     name: servicecenter
+httpConfig:
+  # Address to trigger full synchronization task
+  httpAddr: 0.0.0.0:30300

Review comment:
       httpserver已改造

##########
File path: syncer/client/sync_client_test.go
##########
@@ -0,0 +1,17 @@
+package client
+
+import (
+	"context"
+	"crypto/tls"
+	"testing"
+)
+
+var c = NewSyncClient("", new(tls.Config))
+
+func TestClient_IncrementPull(t *testing.T) {
+	c.IncrementPull(context.Background(), "http://127.0.0.1")

Review comment:
       已修改

##########
File path: syncer/http/http.go
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package http
+
+import (
+	"compress/gzip"
+	"context"
+	"github.com/NYTimes/gziphandler"
+	"github.com/apache/servicecomb-service-center/pkg/log"
+	"github.com/apache/servicecomb-service-center/syncer/pkg/utils"
+	"io"
+	"net/http"
+	"sync"
+)
+
+const (
+	Path    = "/v1/syncer/notify"
+	Message = "Deliver full synchronization task success!"
+)
+
+// Server struct
+type Server struct {
+	server    *http.Server

Review comment:
       httpserver已改造




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] tianxiaoliang commented on a change in pull request #757: Incremental synchronization of sc: sync between sc syncers

Posted by GitBox <gi...@apache.org>.
tianxiaoliang commented on a change in pull request #757:
URL: https://github.com/apache/servicecomb-service-center/pull/757#discussion_r531820479



##########
File path: etc/conf/syncer.yaml
##########
@@ -36,6 +36,11 @@ registry:
   tlsMount:
     enabled: false
     name: servicecenter
+httpConfig:
+  # Address to trigger full synchronization task
+  httpAddr: 0.0.0.0:30300

Review comment:
       httpAddr 前半段啰嗦

##########
File path: etc/conf/syncer.yaml
##########
@@ -36,6 +36,11 @@ registry:
   tlsMount:
     enabled: false
     name: servicecenter
+httpConfig:

Review comment:
       其实本来就是config,httpConfig的后半个词是啰嗦

##########
File path: etc/conf/syncer.yaml
##########
@@ -36,6 +36,11 @@ registry:
   tlsMount:
     enabled: false
     name: servicecenter
+httpConfig:
+  # Address to trigger full synchronization task
+  httpAddr: 0.0.0.0:30300

Review comment:
       另外你这个是监听地址,就要写清楚,listenAddress




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] tianxiaoliang commented on pull request #757: Incremental synchronization of sc: sync between sc syncers

Posted by GitBox <gi...@apache.org>.
tianxiaoliang commented on pull request #757:
URL: https://github.com/apache/servicecomb-service-center/pull/757#issuecomment-735047057


   冲突了,在改造完go chassis后我就会合入代码,因为这里面很多代码其实改造后就没用了


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] lilai23 commented on a change in pull request #757: Incremental synchronization of sc: sync between sc syncers

Posted by GitBox <gi...@apache.org>.
lilai23 commented on a change in pull request #757:
URL: https://github.com/apache/servicecomb-service-center/pull/757#discussion_r532318084



##########
File path: syncer/server/server.go
##########
@@ -85,22 +86,38 @@ type Server struct {
 	// Wraps the grpc server
 	grpc *grpc.Server
 
+	revisionMap map[string]record
+
 	eventQueue []*dump.WatchInstanceChangedEvent
 
+	mapLock sync.RWMutex
+
 	queueLock sync.RWMutex
+

Review comment:
       接收sc中的数据与其他region的syncer进行同步




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] tianxiaoliang commented on a change in pull request #757: Incremental synchronization of sc: sync between sc syncers

Posted by GitBox <gi...@apache.org>.
tianxiaoliang commented on a change in pull request #757:
URL: https://github.com/apache/servicecomb-service-center/pull/757#discussion_r531820359



##########
File path: syncer/server/handler.go
##########
@@ -45,9 +50,78 @@ func (s *Server) tickHandler() {
 	s.servicecenter.FlushData()
 
 	// sends a UserEvent on Serf, the event will be broadcast between members
-	err := s.serf.UserEvent(EventDiscovered, util.StringToBytesWithNoCopy(s.conf.Cluster))
-	if err != nil {
-		log.Errorf(err, "Syncer send user event failed")
+	s.httpserver.Mux.Lock()
+	defer s.httpserver.Mux.Unlock()
+	if s.httpserver.Triggered {
+		err := s.serf.UserEvent(EventNotifyFullPulled, util.StringToBytesWithNoCopy(""))
+		s.httpserver.Triggered = false
+		if err != nil {
+			log.Errorf(err, "Syncer send notifyFullPulled user event failed")
+		}
+		err = alarm.Clear(alarm.IDIncrementPullError)
+		if err != nil {
+			log.Error("", err)
+		}
+	} else {
+		err := s.serf.UserEvent(EventIncrementPulled, util.StringToBytesWithNoCopy(s.conf.Cluster))
+		if err != nil {
+			log.Errorf(err, "Syncer send incrementPulled user event failed")
+		}
+	}
+}
+
+func (s *Server) DataRemoveTickHandler() chan bool {
+	ticker := time.NewTicker(time.Second * 30)
+	stopChan := make(chan bool)
+	go func(trick *time.Ticker) {
+		//defer ticker.Stop()
+		for {
+			select {
+			case <-ticker.C:
+				s.eventQueueDataRemoveTickHandler()
+				log.Infof("size of records map = %d, size of events slice = %d", len(s.revisionMap), len(s.eventQueue))
+			case stop := <-stopChan:
+				if stop {
+					fmt.Print("ready stop")

Review comment:
       应该用日志工具,看看别的地方还有没有,都改了




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] tianxiaoliang commented on a change in pull request #757: Incremental synchronization of sc: sync between sc syncers

Posted by GitBox <gi...@apache.org>.
tianxiaoliang commented on a change in pull request #757:
URL: https://github.com/apache/servicecomb-service-center/pull/757#discussion_r531821054



##########
File path: syncer/http/http.go
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package http
+
+import (
+	"compress/gzip"
+	"context"
+	"github.com/NYTimes/gziphandler"
+	"github.com/apache/servicecomb-service-center/pkg/log"
+	"github.com/apache/servicecomb-service-center/syncer/pkg/utils"
+	"io"
+	"net/http"
+	"sync"
+)
+
+const (
+	Path    = "/v1/syncer/notify"
+	Message = "Deliver full synchronization task success!"
+)
+
+// Server struct
+type Server struct {
+	server    *http.Server

Review comment:
       我觉得没必要自己实现一个http server, syncer应该是go chassis去开发




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] lilai23 commented on a change in pull request #757: Incremental synchronization of sc: sync between sc syncers

Posted by GitBox <gi...@apache.org>.
lilai23 commented on a change in pull request #757:
URL: https://github.com/apache/servicecomb-service-center/pull/757#discussion_r532307456



##########
File path: syncer/server/server.go
##########
@@ -85,22 +86,38 @@ type Server struct {
 	// Wraps the grpc server
 	grpc *grpc.Server
 
+	revisionMap map[string]record
+
 	eventQueue []*dump.WatchInstanceChangedEvent
 
+	mapLock sync.RWMutex
+
 	queueLock sync.RWMutex
+

Review comment:
       这个sever是原本的整个syncer的server,在我本次提交代码之前已经是完整存在,不是单独写的




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [servicecomb-service-center] tianxiaoliang commented on a change in pull request #757: Incremental synchronization of sc: sync between sc syncers

Posted by GitBox <gi...@apache.org>.
tianxiaoliang commented on a change in pull request #757:
URL: https://github.com/apache/servicecomb-service-center/pull/757#discussion_r532316206



##########
File path: syncer/server/server.go
##########
@@ -85,22 +86,38 @@ type Server struct {
 	// Wraps the grpc server
 	grpc *grpc.Server
 
+	revisionMap map[string]record
+
 	eventQueue []*dump.WatchInstanceChangedEvent
 
+	mapLock sync.RWMutex
+
 	queueLock sync.RWMutex
+

Review comment:
       那你现在拿这个server干嘛用




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org