You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ab...@apache.org on 2022/11/01 07:34:23 UTC
[incubator-devlake] branch main updated: fix(tapd): add incremental support (#3632)
This is an automated email from the ASF dual-hosted git repository.
abeizn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new a7bb7ef5 fix(tapd): add incremental support (#3632)
a7bb7ef5 is described below
commit a7bb7ef568843488eb40b7f5f7c83ef1cedc9ee5
Author: Warren Chen <yi...@merico.dev>
AuthorDate: Tue Nov 1 15:34:19 2022 +0800
fix(tapd): add incremental support (#3632)
closes #3631
---
plugins/tapd/tasks/bug_commit_collector.go | 6 ------
plugins/tapd/tasks/story_bug_collector.go | 26 ++++++++++++++++++++++++++
plugins/tapd/tasks/story_commit_collector.go | 6 ------
plugins/tapd/tasks/task_commit_collector.go | 6 ------
4 files changed, 26 insertions(+), 18 deletions(-)
diff --git a/plugins/tapd/tasks/bug_commit_collector.go b/plugins/tapd/tasks/bug_commit_collector.go
index 1c0ac014..5129db2f 100644
--- a/plugins/tapd/tasks/bug_commit_collector.go
+++ b/plugins/tapd/tasks/bug_commit_collector.go
@@ -47,7 +47,6 @@ func CollectBugCommits(taskCtx core.SubTaskContext) errors.Error {
db := taskCtx.GetDal()
logger := taskCtx.GetLogger()
logger.Info("collect issueCommits")
- num := 0
since := data.Since
incremental := false
if since == nil {
@@ -103,11 +102,6 @@ func CollectBugCommits(taskCtx core.SubTaskContext) errors.Error {
var data struct {
Stories []json.RawMessage `json:"data"`
}
- if len(data.Stories) > 0 {
- fmt.Println(len(data.Stories))
- num += len(data.Stories)
- fmt.Printf("num is %d", num)
- }
err := helper.UnmarshalResponse(res, &data)
return data.Stories, err
},
diff --git a/plugins/tapd/tasks/story_bug_collector.go b/plugins/tapd/tasks/story_bug_collector.go
index 764299b1..18db3416 100644
--- a/plugins/tapd/tasks/story_bug_collector.go
+++ b/plugins/tapd/tasks/story_bug_collector.go
@@ -18,10 +18,13 @@ limitations under the License.
package tasks
import (
+ goerror "errors"
"fmt"
"github.com/apache/incubator-devlake/errors"
+ "gorm.io/gorm"
"net/url"
"reflect"
+ "time"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/core/dal"
@@ -38,11 +41,33 @@ func CollectStoryBugs(taskCtx core.SubTaskContext) errors.Error {
db := taskCtx.GetDal()
logger := taskCtx.GetLogger()
logger.Info("collect storyBugs")
+ since := data.Since
+ incremental := false
+ if since == nil {
+ // user didn't specify a time range to sync, try load from database
+ var latestUpdated models.TapdStoryCommit
+ clauses := []dal.Clause{
+ dal.Where("connection_id = ? and workspace_id = ?", data.Options.ConnectionId, data.Options.WorkspaceId),
+ dal.Orderby("created DESC"),
+ }
+ err := db.First(&latestUpdated, clauses...)
+ if err != nil && !goerror.Is(err, gorm.ErrRecordNotFound) {
+ return errors.NotFound.Wrap(err, "failed to get latest tapd changelog record")
+ }
+ if latestUpdated.Id > 0 {
+ since = (*time.Time)(latestUpdated.Created)
+ incremental = true
+ }
+ }
clauses := []dal.Clause{
+ dal.Select("id"),
dal.From(&models.TapdStory{}),
dal.Where("connection_id = ? and workspace_id = ?", data.Options.ConnectionId, data.Options.WorkspaceId),
}
+ if since != nil {
+ clauses = append(clauses, dal.Where("modified > ?", since))
+ }
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -55,6 +80,7 @@ func CollectStoryBugs(taskCtx core.SubTaskContext) errors.Error {
collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
+ Incremental: incremental,
Input: iterator,
UrlTemplate: "stories/get_related_bugs",
Query: func(reqData *helper.RequestData) (url.Values, errors.Error) {
diff --git a/plugins/tapd/tasks/story_commit_collector.go b/plugins/tapd/tasks/story_commit_collector.go
index e6a440ba..6dd50265 100644
--- a/plugins/tapd/tasks/story_commit_collector.go
+++ b/plugins/tapd/tasks/story_commit_collector.go
@@ -47,7 +47,6 @@ func CollectStoryCommits(taskCtx core.SubTaskContext) errors.Error {
db := taskCtx.GetDal()
logger := taskCtx.GetLogger()
logger.Info("collect issueCommits")
- num := 0
since := data.Since
incremental := false
if since == nil {
@@ -103,11 +102,6 @@ func CollectStoryCommits(taskCtx core.SubTaskContext) errors.Error {
var data struct {
Stories []json.RawMessage `json:"data"`
}
- if len(data.Stories) > 0 {
- fmt.Println(len(data.Stories))
- num += len(data.Stories)
- fmt.Printf("num is %d", num)
- }
err := helper.UnmarshalResponse(res, &data)
return data.Stories, err
},
diff --git a/plugins/tapd/tasks/task_commit_collector.go b/plugins/tapd/tasks/task_commit_collector.go
index d171cdff..cdd6e295 100644
--- a/plugins/tapd/tasks/task_commit_collector.go
+++ b/plugins/tapd/tasks/task_commit_collector.go
@@ -47,7 +47,6 @@ func CollectTaskCommits(taskCtx core.SubTaskContext) errors.Error {
db := taskCtx.GetDal()
logger := taskCtx.GetLogger()
logger.Info("collect issueCommits")
- num := 0
since := data.Since
incremental := false
if since == nil {
@@ -103,11 +102,6 @@ func CollectTaskCommits(taskCtx core.SubTaskContext) errors.Error {
var data struct {
Stories []json.RawMessage `json:"data"`
}
- if len(data.Stories) > 0 {
- fmt.Println(len(data.Stories))
- num += len(data.Stories)
- fmt.Printf("num is %d", num)
- }
err := helper.UnmarshalResponse(res, &data)
return data.Stories, err
},