You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by zh...@apache.org on 2023/03/22 11:16:26 UTC
[incubator-devlake] branch main updated: refactor: simplify diffSync logic for jira worklog/changelog/remotelink collectors (#4729)
This is an automated email from the ASF dual-hosted git repository.
zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 771bcb30f refactor: simplify diffSync logic for jira worklog/changelog/remotelink collectors (#4729)
771bcb30f is described below
commit 771bcb30f0fe4a2c4c9380a6c2c4b013e6751970
Author: abeizn <zi...@merico.dev>
AuthorDate: Wed Mar 22 19:16:20 2023 +0800
refactor: simplify diffSync logic for jira worklog/changelog/remotelink collectors (#4729)
---
.../jira/tasks/issue_changelog_collector.go | 24 +++++++---------------
backend/plugins/jira/tasks/remotelink_collector.go | 23 ++++++++-------------
backend/plugins/jira/tasks/worklog_collector.go | 24 +++++++---------------
3 files changed, 22 insertions(+), 49 deletions(-)
diff --git a/backend/plugins/jira/tasks/issue_changelog_collector.go b/backend/plugins/jira/tasks/issue_changelog_collector.go
index 6abb101d3..be9b06f32 100644
--- a/backend/plugins/jira/tasks/issue_changelog_collector.go
+++ b/backend/plugins/jira/tasks/issue_changelog_collector.go
@@ -65,28 +65,18 @@ func CollectIssueChangelogs(taskCtx plugin.SubTaskContext) errors.Error {
return err
}
- // query for issue_ids that needed changelog collection
clauses := []dal.Clause{
- dal.Select("i.issue_id, i.updated AS update_time"),
+ dal.Select("i.issue_id AS issue_id, i.updated AS update_time"),
dal.From("_tool_jira_board_issues bi"),
dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)"),
- dal.Join("LEFT JOIN _tool_jira_issue_changelogs c ON (c.connection_id = i.connection_id AND c.issue_id = i.issue_id)"),
- dal.Where("i.updated > i.created AND bi.connection_id = ? AND bi.board_id = ? AND i.std_type != ? ", data.Options.ConnectionId, data.Options.BoardId, "Epic"),
- dal.Groupby("i.issue_id, i.updated"),
+ dal.Where("bi.connection_id=? and bi.board_id = ? AND i.std_type != ?", data.Options.ConnectionId, data.Options.BoardId, "Epic"),
}
incremental := collectorWithState.IsIncremental()
- if incremental {
- clauses = append(clauses, dal.Having("i.updated > ? AND (i.updated > max(c.issue_updated) OR (max(c.issue_updated) IS NULL AND COUNT(c.changelog_id) > 0))", collectorWithState.LatestState.LatestSuccessStart))
- } else {
- /*
- i.updated > max(rl.issue_updated) was deleted because for non-incremental collection,
- max(rl.issue_updated) will only be one of null, less or equal to i.updated
- so i.updated > max(rl.issue_updated) is always false.
- max(c.issue_updated) IS NULL AND COUNT(c.changelog_id) > 0 infers the issue has more than 100 changelogs,
- because we collected changelogs when collecting issues, and assign changelog.issue_updated if num of changelogs < 100,
- and max(c.issue_updated) IS NULL AND COUNT(c.changelog_id) > 0 means all changelogs for the issue were not assigned issue_updated
- */
- clauses = append(clauses, dal.Having("max(c.issue_updated) IS NULL AND COUNT(c.changelog_id) > 0"))
+ if incremental && collectorWithState.LatestState.LatestSuccessStart != nil {
+ clauses = append(
+ clauses,
+ dal.Where("i.updated > ?", collectorWithState.LatestState.LatestSuccessStart),
+ )
}
if logger.IsLevelEnabled(log.LOG_DEBUG) {
diff --git a/backend/plugins/jira/tasks/remotelink_collector.go b/backend/plugins/jira/tasks/remotelink_collector.go
index 75ed32938..c6bfc9286 100644
--- a/backend/plugins/jira/tasks/remotelink_collector.go
+++ b/backend/plugins/jira/tasks/remotelink_collector.go
@@ -60,26 +60,19 @@ func CollectRemotelinks(taskCtx plugin.SubTaskContext) errors.Error {
}
clauses := []dal.Clause{
- dal.Select("i.issue_id, i.updated AS update_time"),
+ dal.Select("i.issue_id AS issue_id, i.updated AS update_time"),
dal.From("_tool_jira_board_issues bi"),
dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)"),
- dal.Join("LEFT JOIN _tool_jira_remotelinks rl ON (rl.connection_id = i.connection_id AND rl.issue_id = i.issue_id)"),
- dal.Where("i.updated > i.created AND bi.connection_id = ? AND bi.board_id = ? ", data.Options.ConnectionId, data.Options.BoardId),
- dal.Groupby("i.issue_id, i.updated"),
+ dal.Where("bi.connection_id=? and bi.board_id = ?", data.Options.ConnectionId, data.Options.BoardId),
}
incremental := collectorWithState.IsIncremental()
- if incremental {
- clauses = append(clauses, dal.Having("i.updated > ? AND (i.updated > max(rl.issue_updated) OR max(rl.issue_updated) IS NULL)", collectorWithState.LatestState.LatestSuccessStart))
+ if incremental && collectorWithState.LatestState.LatestSuccessStart != nil {
+ clauses = append(
+ clauses,
+ dal.Where("i.updated > ?", collectorWithState.LatestState.LatestSuccessStart),
+ )
}
- /*
- i.updated > max(rl.issue_updated) was deleted because for non-incremental collection, max(rl.issue_updated) is always null.
- so i.updated > max(rl.issue_updated) is constantly false
- also, for the first collection, max(rl.issue_updated) is always null as there is no data in _tool_jira_remotelinks.
- In conclusion, we don't need the following clause
- */
- //else {
- // clauses = append(clauses, dal.Having("i.updated > max(rl.issue_updated) OR max(rl.issue_updated) IS NULL "))
- //}
+
cursor, err := db.Cursor(clauses...)
if err != nil {
logger.Error(err, "collect remotelink error")
diff --git a/backend/plugins/jira/tasks/worklog_collector.go b/backend/plugins/jira/tasks/worklog_collector.go
index bd536eb47..8e26f708d 100644
--- a/backend/plugins/jira/tasks/worklog_collector.go
+++ b/backend/plugins/jira/tasks/worklog_collector.go
@@ -57,28 +57,18 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) errors.Error {
return err
}
- // filter out issue_ids that needed collection
clauses := []dal.Clause{
- dal.Select("i.issue_id, i.updated AS update_time"),
+ dal.Select("i.issue_id AS issue_id, i.updated AS update_time"),
dal.From("_tool_jira_board_issues bi"),
dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)"),
- dal.Join("LEFT JOIN _tool_jira_worklogs wl ON (wl.connection_id = i.connection_id AND wl.issue_id = i.issue_id)"),
- dal.Where("i.updated > i.created AND bi.connection_id = ? AND bi.board_id = ? ", data.Options.ConnectionId, data.Options.BoardId),
- dal.Groupby("i.issue_id, i.updated"),
+ dal.Where("bi.connection_id=? and bi.board_id = ?", data.Options.ConnectionId, data.Options.BoardId),
}
incremental := collectorWithState.IsIncremental()
- if incremental {
- clauses = append(clauses, dal.Having("i.updated > ? AND (i.updated > max(wl.issue_updated) OR (max(wl.issue_updated) IS NULL AND COUNT(wl.worklog_id) > 0))", collectorWithState.LatestState.LatestSuccessStart))
- } else {
- /*
- i.updated > max(rl.issue_updated) was deleted because for non-incremental collection,
- max(rl.issue_updated) will only be one of null, less or equal to i.updated
- so i.updated > max(rl.issue_updated) is always false.
- max(c.issue_updated) IS NULL AND COUNT(c.worklog_id) > 0 infers the issue has more than 100 worklogs,
- because we collected worklogs when collecting issues, and assign worklog.issue_updated if num of worklogs < 100,
- and max(c.issue_updated) IS NULL AND COUNT(c.worklog_id) > 0 means all worklogs for the issue were not assigned issue_updated
- */
- clauses = append(clauses, dal.Having("max(wl.issue_updated) IS NULL AND COUNT(wl.worklog_id) > 0"))
+ if incremental && collectorWithState.LatestState.LatestSuccessStart != nil {
+ clauses = append(
+ clauses,
+ dal.Where("i.updated > ?", collectorWithState.LatestState.LatestSuccessStart),
+ )
}
// construct the input iterator