You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ab...@apache.org on 2022/11/01 07:34:23 UTC

[incubator-devlake] branch main updated: fix(tapd): add incremental support (#3632)

This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new a7bb7ef5 fix(tapd): add incremental support (#3632)
a7bb7ef5 is described below

commit a7bb7ef568843488eb40b7f5f7c83ef1cedc9ee5
Author: Warren Chen <yi...@merico.dev>
AuthorDate: Tue Nov 1 15:34:19 2022 +0800

    fix(tapd): add incremental support (#3632)
    
    closes #3631
---
 plugins/tapd/tasks/bug_commit_collector.go   |  6 ------
 plugins/tapd/tasks/story_bug_collector.go    | 26 ++++++++++++++++++++++++++
 plugins/tapd/tasks/story_commit_collector.go |  6 ------
 plugins/tapd/tasks/task_commit_collector.go  |  6 ------
 4 files changed, 26 insertions(+), 18 deletions(-)

diff --git a/plugins/tapd/tasks/bug_commit_collector.go b/plugins/tapd/tasks/bug_commit_collector.go
index 1c0ac014..5129db2f 100644
--- a/plugins/tapd/tasks/bug_commit_collector.go
+++ b/plugins/tapd/tasks/bug_commit_collector.go
@@ -47,7 +47,6 @@ func CollectBugCommits(taskCtx core.SubTaskContext) errors.Error {
 	db := taskCtx.GetDal()
 	logger := taskCtx.GetLogger()
 	logger.Info("collect issueCommits")
-	num := 0
 	since := data.Since
 	incremental := false
 	if since == nil {
@@ -103,11 +102,6 @@ func CollectBugCommits(taskCtx core.SubTaskContext) errors.Error {
 			var data struct {
 				Stories []json.RawMessage `json:"data"`
 			}
-			if len(data.Stories) > 0 {
-				fmt.Println(len(data.Stories))
-				num += len(data.Stories)
-				fmt.Printf("num is %d", num)
-			}
 			err := helper.UnmarshalResponse(res, &data)
 			return data.Stories, err
 		},
diff --git a/plugins/tapd/tasks/story_bug_collector.go b/plugins/tapd/tasks/story_bug_collector.go
index 764299b1..18db3416 100644
--- a/plugins/tapd/tasks/story_bug_collector.go
+++ b/plugins/tapd/tasks/story_bug_collector.go
@@ -18,10 +18,13 @@ limitations under the License.
 package tasks
 
 import (
+	goerror "errors"
 	"fmt"
 	"github.com/apache/incubator-devlake/errors"
+	"gorm.io/gorm"
 	"net/url"
 	"reflect"
+	"time"
 
 	"github.com/apache/incubator-devlake/plugins/core"
 	"github.com/apache/incubator-devlake/plugins/core/dal"
@@ -38,11 +41,33 @@ func CollectStoryBugs(taskCtx core.SubTaskContext) errors.Error {
 	db := taskCtx.GetDal()
 	logger := taskCtx.GetLogger()
 	logger.Info("collect storyBugs")
+	since := data.Since
+	incremental := false
+	if since == nil {
+		// user didn't specify a time range to sync, try load from database
+		var latestUpdated models.TapdStoryCommit
+		clauses := []dal.Clause{
+			dal.Where("connection_id = ? and workspace_id = ?", data.Options.ConnectionId, data.Options.WorkspaceId),
+			dal.Orderby("created DESC"),
+		}
+		err := db.First(&latestUpdated, clauses...)
+		if err != nil && !goerror.Is(err, gorm.ErrRecordNotFound) {
+			return errors.NotFound.Wrap(err, "failed to get latest tapd changelog record")
+		}
+		if latestUpdated.Id > 0 {
+			since = (*time.Time)(latestUpdated.Created)
+			incremental = true
+		}
+	}
 
 	clauses := []dal.Clause{
+		dal.Select("id"),
 		dal.From(&models.TapdStory{}),
 		dal.Where("connection_id = ? and workspace_id = ?", data.Options.ConnectionId, data.Options.WorkspaceId),
 	}
+	if since != nil {
+		clauses = append(clauses, dal.Where("modified > ?", since))
+	}
 
 	cursor, err := db.Cursor(clauses...)
 	if err != nil {
@@ -55,6 +80,7 @@ func CollectStoryBugs(taskCtx core.SubTaskContext) errors.Error {
 	collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
 		RawDataSubTaskArgs: *rawDataSubTaskArgs,
 		ApiClient:          data.ApiClient,
+		Incremental:        incremental,
 		Input:              iterator,
 		UrlTemplate:        "stories/get_related_bugs",
 		Query: func(reqData *helper.RequestData) (url.Values, errors.Error) {
diff --git a/plugins/tapd/tasks/story_commit_collector.go b/plugins/tapd/tasks/story_commit_collector.go
index e6a440ba..6dd50265 100644
--- a/plugins/tapd/tasks/story_commit_collector.go
+++ b/plugins/tapd/tasks/story_commit_collector.go
@@ -47,7 +47,6 @@ func CollectStoryCommits(taskCtx core.SubTaskContext) errors.Error {
 	db := taskCtx.GetDal()
 	logger := taskCtx.GetLogger()
 	logger.Info("collect issueCommits")
-	num := 0
 	since := data.Since
 	incremental := false
 	if since == nil {
@@ -103,11 +102,6 @@ func CollectStoryCommits(taskCtx core.SubTaskContext) errors.Error {
 			var data struct {
 				Stories []json.RawMessage `json:"data"`
 			}
-			if len(data.Stories) > 0 {
-				fmt.Println(len(data.Stories))
-				num += len(data.Stories)
-				fmt.Printf("num is %d", num)
-			}
 			err := helper.UnmarshalResponse(res, &data)
 			return data.Stories, err
 		},
diff --git a/plugins/tapd/tasks/task_commit_collector.go b/plugins/tapd/tasks/task_commit_collector.go
index d171cdff..cdd6e295 100644
--- a/plugins/tapd/tasks/task_commit_collector.go
+++ b/plugins/tapd/tasks/task_commit_collector.go
@@ -47,7 +47,6 @@ func CollectTaskCommits(taskCtx core.SubTaskContext) errors.Error {
 	db := taskCtx.GetDal()
 	logger := taskCtx.GetLogger()
 	logger.Info("collect issueCommits")
-	num := 0
 	since := data.Since
 	incremental := false
 	if since == nil {
@@ -103,11 +102,6 @@ func CollectTaskCommits(taskCtx core.SubTaskContext) errors.Error {
 			var data struct {
 				Stories []json.RawMessage `json:"data"`
 			}
-			if len(data.Stories) > 0 {
-				fmt.Println(len(data.Stories))
-				num += len(data.Stories)
-				fmt.Printf("num is %d", num)
-			}
 			err := helper.UnmarshalResponse(res, &data)
 			return data.Stories, err
 		},