You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2022/11/08 13:23:24 UTC

[servicecomb-service-center] branch nzx updated: 用于双集群引擎同步debug-v2 (#1356)

This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a commit to branch nzx
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/nzx by this push:
     new a2189be7 用于双集群引擎同步debug-v2 (#1356)
a2189be7 is described below

commit a2189be71ea3401afa9916cd2b750544067e00c7
Author: kkf1 <46...@users.noreply.github.com>
AuthorDate: Tue Nov 8 21:23:18 2022 +0800

    用于双集群引擎同步debug-v2 (#1356)
---
 syncer/service/task/manager.go | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/syncer/service/task/manager.go b/syncer/service/task/manager.go
index 1072ca9e..93e9d099 100644
--- a/syncer/service/task/manager.go
+++ b/syncer/service/task/manager.go
@@ -186,6 +186,7 @@ func (m *manager) ListTasks(ctx context.Context) ([]*carisync.Task, error) {
 	noHandleTasks := make([]*carisync.Task, 0, len(tasks))
 	skipTaskIDs := make([]string, 0, len(tasks))
 	for _, t := range tasks {
+		log.Info(fmt.Sprintf("list task id: %v", t.ID))
 		_, ok := m.cache.Load(t.ID)
 		if ok {
 			skipTaskIDs = append(skipTaskIDs, t.ID)
@@ -194,6 +195,7 @@ func (m *manager) ListTasks(ctx context.Context) ([]*carisync.Task, error) {
 		m.cache.Store(t.ID, t)
 
 		noHandleTasks = append(noHandleTasks, t)
+		log.Info(fmt.Sprintf("no handle task id: %v", t.ID))
 	}
 
 	log.Info(fmt.Sprintf("load task raw count %d, to handle count %d, skip ids %v",
@@ -259,7 +261,7 @@ func (m *manager) handleResult(res *event.Result) {
 
 	log.Info(fmt.Sprintf("key: %s,result: %v", res.ID, res.Data))
 
-	t, ok := m.cache.LoadAndDelete(res.ID)
+	t, ok := m.cache.Load(res.ID)
 	if !ok {
 		return
 	}
@@ -272,6 +274,8 @@ func (m *manager) handleResult(res *event.Result) {
 			log.Error("delete task failed", err)
 		}
 	}
+
+	m.cache.Delete(res.ID)
 }
 
 func (m *manager) handleTasks(sts syncTasks) {