You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by kl...@apache.org on 2022/06/21 05:50:46 UTC
[incubator-devlake] branch main updated: Refactor collector of worklogs and remotelinks (#2274)
This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 45ea58b4 Refactor collector of worklogs and remotelinks (#2274)
45ea58b4 is described below
commit 45ea58b4019d4f33698f2a3e37d2d375d6feb594
Author: mindlesscloud <li...@merico.dev>
AuthorDate: Tue Jun 21 13:50:42 2022 +0800
Refactor collector of worklogs and remotelinks (#2274)
* refactor: issue status transformation rules
* refactor: worklogs and remotelinks collector
---
plugins/jira/jira.go | 1 +
plugins/jira/models/issue.go | 4 --
.../migrationscripts/updateSchemas20220620.go | 75 ++++++++++++++++++++++
plugins/jira/models/remotelink.go | 2 +
plugins/jira/models/worklog.go | 1 +
plugins/jira/tasks/apiv2models/changelog.go | 4 +-
plugins/jira/tasks/apiv2models/issue.go | 26 ++++----
plugins/jira/tasks/apiv2models/worklog.go | 4 +-
plugins/jira/tasks/changelog_convertor.go | 15 +----
plugins/jira/tasks/changelog_extractor.go | 2 +-
plugins/jira/tasks/issue_extractor.go | 24 ++-----
plugins/jira/tasks/remotelink_collector.go | 40 +++++-------
plugins/jira/tasks/remotelink_extractor.go | 21 +++---
plugins/jira/tasks/shared.go | 17 +----
plugins/jira/tasks/worklog_collector.go | 25 ++++----
plugins/jira/tasks/worklog_extractor.go | 10 +--
16 files changed, 148 insertions(+), 123 deletions(-)
diff --git a/plugins/jira/jira.go b/plugins/jira/jira.go
index 11f26281..97abb9d8 100644
--- a/plugins/jira/jira.go
+++ b/plugins/jira/jira.go
@@ -170,6 +170,7 @@ func (plugin Jira) MigrationScripts() []migration.Script {
new(migrationscripts.UpdateSchemas20220614),
new(migrationscripts.UpdateSchemas20220615),
new(migrationscripts.UpdateSchemas20220616),
+ new(migrationscripts.UpdateSchemas20220620),
}
}
diff --git a/plugins/jira/models/issue.go b/plugins/jira/models/issue.go
index ae74e3ea..1a18a4db 100644
--- a/plugins/jira/models/issue.go
+++ b/plugins/jira/models/issue.go
@@ -62,10 +62,6 @@ type JiraIssue struct {
StdType string `gorm:"type:varchar(255)"`
StdStatus string `gorm:"type:varchar(255)"`
AllFields datatypes.JSONMap
-
- // internal status tracking
- RemotelinkUpdated *time.Time
- WorklogUpdated *time.Time
common.NoPKModel
}
diff --git a/plugins/jira/models/migrationscripts/updateSchemas20220620.go b/plugins/jira/models/migrationscripts/updateSchemas20220620.go
new file mode 100644
index 00000000..70fc52fc
--- /dev/null
+++ b/plugins/jira/models/migrationscripts/updateSchemas20220620.go
@@ -0,0 +1,75 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migrationscripts
+
+import (
+ "context"
+ "gorm.io/gorm"
+ "time"
+)
+
+type UpdateSchemas20220620 struct {
+}
+
+type JiraIssue20220620 struct{}
+
+func (JiraIssue20220620) TableName() string {
+ return "_tool_jira_issues"
+}
+
+type JiraWorklog20220620 struct {
+ IssueUpdated *time.Time
+}
+
+func (JiraWorklog20220620) TableName() string {
+ return "_tool_jira_worklogs"
+}
+
+type JiraRemotelink20220620 struct {
+ IssueUpdated *time.Time
+}
+
+func (JiraRemotelink20220620) TableName() string {
+ return "_tool_jira_remotelinks"
+}
+
+func (*UpdateSchemas20220620) Up(ctx context.Context, db *gorm.DB) error {
+ var err error
+ err = db.Migrator().DropColumn(&JiraIssue20220620{}, "worklog_updated")
+ if err != nil {
+ return err
+ }
+ err = db.Migrator().DropColumn(&JiraIssue20220620{}, "remotelink_updated")
+ if err != nil {
+ return err
+ }
+ err = db.Migrator().AutoMigrate(&JiraWorklog20220620{}, &JiraRemotelink20220620{})
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (*UpdateSchemas20220620) Version() uint64 {
+ return 20220620101111
+}
+
+func (*UpdateSchemas20220620) Name() string {
+ return "add column issue_updated to _tool_jira_worklogs and _tool_jira_remotelinks"
+}
diff --git a/plugins/jira/models/remotelink.go b/plugins/jira/models/remotelink.go
index de624929..f543b5b3 100644
--- a/plugins/jira/models/remotelink.go
+++ b/plugins/jira/models/remotelink.go
@@ -20,6 +20,7 @@ package models
import (
"github.com/apache/incubator-devlake/models/common"
"gorm.io/datatypes"
+ "time"
)
type JiraRemotelink struct {
@@ -33,6 +34,7 @@ type JiraRemotelink struct {
Self string `gorm:"type:varchar(255)"`
Title string
Url string `gorm:"type:varchar(255)"`
+ IssueUpdated *time.Time
}
func (JiraRemotelink) TableName() string {
diff --git a/plugins/jira/models/worklog.go b/plugins/jira/models/worklog.go
index 7ecafae8..e340013d 100644
--- a/plugins/jira/models/worklog.go
+++ b/plugins/jira/models/worklog.go
@@ -34,6 +34,7 @@ type JiraWorklog struct {
TimeSpentSeconds int
Updated time.Time
Started time.Time
+ IssueUpdated *time.Time
}
func (JiraWorklog) TableName() string {
diff --git a/plugins/jira/tasks/apiv2models/changelog.go b/plugins/jira/tasks/apiv2models/changelog.go
index bfe66897..3d12212a 100644
--- a/plugins/jira/tasks/apiv2models/changelog.go
+++ b/plugins/jira/tasks/apiv2models/changelog.go
@@ -20,6 +20,7 @@ package apiv2models
import (
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/apache/incubator-devlake/plugins/jira/models"
+ "time"
)
type Changelog struct {
@@ -29,7 +30,7 @@ type Changelog struct {
Items []ChangelogItem `json:"items"`
}
-func (c Changelog) ToToolLayer(connectionId, issueId uint64) (*models.JiraChangelog, *models.JiraUser) {
+func (c Changelog) ToToolLayer(connectionId, issueId uint64, issueUpdated *time.Time) (*models.JiraChangelog, *models.JiraUser) {
return &models.JiraChangelog{
ConnectionId: connectionId,
ChangelogId: c.ID,
@@ -38,6 +39,7 @@ func (c Changelog) ToToolLayer(connectionId, issueId uint64) (*models.JiraChange
AuthorDisplayName: c.Author.DisplayName,
AuthorActive: c.Author.Active,
Created: c.Created.ToTime(),
+ IssueUpdated: issueUpdated,
}, c.Author.ToToolLayer(connectionId)
}
diff --git a/plugins/jira/tasks/apiv2models/issue.go b/plugins/jira/tasks/apiv2models/issue.go
index a110bbd6..52dd60c6 100644
--- a/plugins/jira/tasks/apiv2models/issue.go
+++ b/plugins/jira/tasks/apiv2models/issue.go
@@ -19,8 +19,8 @@ package apiv2models
import (
"encoding/json"
-
"gorm.io/datatypes"
+ "time"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/apache/incubator-devlake/plugins/jira/models"
@@ -224,26 +224,30 @@ func (i *Issue) SetAllFields(raw datatypes.JSON) error {
return nil
}
-func (i Issue) ExtractEntities(connectionId uint64) ([]uint64, *models.JiraIssue, bool, []*models.JiraWorklog, []*models.JiraChangelog, []*models.JiraChangelogItem, []*models.JiraUser) {
+func (i Issue) ExtractEntities(connectionId uint64) ([]uint64, *models.JiraIssue, []*models.JiraWorklog, []*models.JiraChangelog, []*models.JiraChangelogItem, []*models.JiraUser) {
issue := i.toToolLayer(connectionId)
var worklogs []*models.JiraWorklog
var changelogs []*models.JiraChangelog
var changelogItems []*models.JiraChangelogItem
var users []*models.JiraUser
- var needCollectWorklog bool
var sprints []uint64
+
if i.Fields.Worklog != nil {
- if i.Fields.Worklog.Total > len(i.Fields.Worklog.Worklogs) {
- needCollectWorklog = true
- } else {
- for _, w := range i.Fields.Worklog.Worklogs {
- worklogs = append(worklogs, w.ToToolLayer(connectionId))
- }
+ var issueUpdated *time.Time
+ if len(i.Fields.Worklog.Worklogs) <= i.Fields.Worklog.Total {
+ issueUpdated = i.Fields.Updated.ToNullableTime()
+ }
+ for _, w := range i.Fields.Worklog.Worklogs {
+ worklogs = append(worklogs, w.ToToolLayer(connectionId, issueUpdated))
}
}
if i.Changelog != nil {
+ var issueUpdated *time.Time
+ if len(i.Changelog.Histories) < 100 {
+ issueUpdated = i.Fields.Updated.ToNullableTime()
+ }
for _, changelog := range i.Changelog.Histories {
- cl, user := changelog.ToToolLayer(connectionId, i.ID)
+ cl, user := changelog.ToToolLayer(connectionId, i.ID, issueUpdated)
changelogs = append(changelogs, cl)
users = append(users, user)
for _, item := range changelog.Items {
@@ -262,5 +266,5 @@ func (i Issue) ExtractEntities(connectionId uint64) ([]uint64, *models.JiraIssue
if i.Fields.Assignee != nil {
users = append(users, i.Fields.Assignee.ToToolLayer(connectionId))
}
- return sprints, issue, needCollectWorklog, worklogs, changelogs, changelogItems, users
+ return sprints, issue, worklogs, changelogs, changelogItems, users
}
diff --git a/plugins/jira/tasks/apiv2models/worklog.go b/plugins/jira/tasks/apiv2models/worklog.go
index 8ac25d24..db8cddac 100644
--- a/plugins/jira/tasks/apiv2models/worklog.go
+++ b/plugins/jira/tasks/apiv2models/worklog.go
@@ -19,6 +19,7 @@ package apiv2models
import (
"github.com/apache/incubator-devlake/plugins/helper"
+ "time"
"github.com/apache/incubator-devlake/plugins/jira/models"
)
@@ -37,7 +38,7 @@ type Worklog struct {
IssueID uint64 `json:"issueId,string"`
}
-func (w Worklog) ToToolLayer(connectionId uint64) *models.JiraWorklog {
+func (w Worklog) ToToolLayer(connectionId uint64, issueUpdated *time.Time) *models.JiraWorklog {
result := &models.JiraWorklog{
ConnectionId: connectionId,
IssueId: w.IssueID,
@@ -46,6 +47,7 @@ func (w Worklog) ToToolLayer(connectionId uint64) *models.JiraWorklog {
TimeSpentSeconds: w.TimeSpentSeconds,
Updated: w.Updated.ToTime(),
Started: w.Started.ToTime(),
+ IssueUpdated: issueUpdated,
}
if w.Author != nil {
result.AuthorId = w.Author.EmailAddress
diff --git a/plugins/jira/tasks/changelog_convertor.go b/plugins/jira/tasks/changelog_convertor.go
index b3e08c6c..c1458e76 100644
--- a/plugins/jira/tasks/changelog_convertor.go
+++ b/plugins/jira/tasks/changelog_convertor.go
@@ -47,11 +47,6 @@ func ConvertChangelogs(taskCtx core.SubTaskContext) error {
logger := taskCtx.GetLogger()
db := taskCtx.GetDal()
logger.Info("covert changelog")
- statusMap, err := GetStatusInfo(taskCtx.GetDb())
- if err != nil {
- logger.Error(err.Error())
- return err
- }
// select all changelogs belongs to the board
clauses := []dal.Clause{
dal.Select("_tool_jira_changelog_items.*, _tool_jira_changelogs.issue_id, author_account_id, author_display_name, created"),
@@ -123,14 +118,8 @@ func ConvertChangelogs(taskCtx core.SubTaskContext) error {
}
}
if row.Field == "status" {
- fromStatus, ok := statusMap[row.FromString]
- if ok {
- changelog.FromValue = GetStdStatus(fromStatus.StatusCategory)
- }
- toStatus, ok := statusMap[row.ToString]
- if ok {
- changelog.ToValue = GetStdStatus(toStatus.StatusCategory)
- }
+ changelog.FromValue = getStdStatus(row.FromString)
+ changelog.ToValue = getStdStatus(row.ToString)
}
return []interface{}{changelog}, nil
},
diff --git a/plugins/jira/tasks/changelog_extractor.go b/plugins/jira/tasks/changelog_extractor.go
index 435cf501..211744f2 100644
--- a/plugins/jira/tasks/changelog_extractor.go
+++ b/plugins/jira/tasks/changelog_extractor.go
@@ -57,7 +57,7 @@ func ExtractChangelogs(taskCtx core.SubTaskContext) error {
}
// prepare output
var result []interface{}
- cl, user := changelog.ToToolLayer(connectionId, input.IssueId)
+ cl, user := changelog.ToToolLayer(connectionId, input.IssueId, &input.UpdateTime)
// this is crucial for incremental update
cl.IssueUpdated = &input.UpdateTime
// collect changelog / user inforation
diff --git a/plugins/jira/tasks/issue_extractor.go b/plugins/jira/tasks/issue_extractor.go
index 431d6131..63e828af 100644
--- a/plugins/jira/tasks/issue_extractor.go
+++ b/plugins/jira/tasks/issue_extractor.go
@@ -23,7 +23,6 @@ import (
"strings"
"time"
- "github.com/apache/incubator-devlake/models/domainlayer/ticket"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/apache/incubator-devlake/plugins/jira/models"
@@ -50,22 +49,6 @@ func ExtractIssues(taskCtx core.SubTaskContext) error {
for _, userType := range data.Options.IssueExtraction.IncidentTypeMapping {
typeMappings[userType] = "INCIDENT"
}
- getStdType := func(userType string) string {
- stdType := typeMappings[userType]
- if stdType == "" {
- return strings.ToUpper(userType)
- }
- return strings.ToUpper(stdType)
- }
- getStdStatus := func(statusKey string) string {
- if statusKey == "done" {
- return ticket.DONE
- } else if statusKey == "new" {
- return ticket.TODO
- } else {
- return ticket.IN_PROGRESS
- }
- }
extractor, err := helper.NewApiExtractor(helper.ApiExtractorArgs{
RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
@@ -94,7 +77,7 @@ func ExtractIssues(taskCtx core.SubTaskContext) error {
return nil, err
}
var results []interface{}
- sprints, issue, _, worklogs, changelogs, changelogItems, users := apiIssue.ExtractEntities(data.Connection.ID)
+ sprints, issue, worklogs, changelogs, changelogItems, users := apiIssue.ExtractEntities(data.Connection.ID)
for _, sprintId := range sprints {
sprintIssue := &models.JiraSprintIssue{
ConnectionId: data.Connection.ID,
@@ -113,7 +96,10 @@ func ExtractIssues(taskCtx core.SubTaskContext) error {
issue.StoryPoint, _ = strconv.ParseFloat(strStoryPoint, 32)
}
issue.StdStoryPoint = uint(issue.StoryPoint)
- issue.StdType = getStdType(issue.Type)
+ issue.StdType = typeMappings[issue.Type]
+ if issue.StdType == "" {
+ issue.StdType = strings.ToUpper(issue.Type)
+ }
issue.StdStatus = getStdStatus(issue.StatusKey)
results = append(results, issue)
for _, worklog := range worklogs {
diff --git a/plugins/jira/tasks/remotelink_collector.go b/plugins/jira/tasks/remotelink_collector.go
index f1e2c7bf..0e6d55f5 100644
--- a/plugins/jira/tasks/remotelink_collector.go
+++ b/plugins/jira/tasks/remotelink_collector.go
@@ -23,7 +23,7 @@ import (
"reflect"
"github.com/apache/incubator-devlake/plugins/core"
- . "github.com/apache/incubator-devlake/plugins/core/dal"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/apache/incubator-devlake/plugins/jira/tasks/apiv2models"
)
@@ -38,28 +38,21 @@ func CollectRemotelinks(taskCtx core.SubTaskContext) error {
logger := taskCtx.GetLogger()
logger.Info("collect remotelink")
- /*
- `CollectIssues` will take into account of `since` option and set the `updated` field for issues that have
- updates, So when it comes to collecting remotelinks, we only need to compare an issue's `updated` field with its
- `remotelink_updated` field. If `remotelink_updated` is older, then we'll collect remotelinks for this issue and
- set its `remotelink_updated` to `updated` at the end.
- */
- cursor, err := db.Cursor(
- Select("i.issue_id, NOW() AS update_time"),
- From("_tool_jira_remotelinks i"),
- Join(`LEFT JOIN _tool_jira_board_issues bi ON (
- bi.connection_id = i.connection_id AND
- bi.issue_id = i.issue_id
- )`),
- Where(`
- bi.connection_id = ? AND
- bi.board_id = ? AND
- (i.remotelink_updated IS NULL OR i.remotelink_updated < i.updated)
- `,
- data.Options.ConnectionId,
- data.Options.BoardId,
- ),
- )
+ clauses := []dal.Clause{
+ dal.Select("i.issue_id, i.updated AS update_time"),
+ dal.From("_tool_jira_board_issues bi"),
+ dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)"),
+ dal.Join("LEFT JOIN _tool_jira_remotelinks rl ON (rl.connection_id = i.connection_id AND rl.issue_id = i.issue_id)"),
+ dal.Where("i.updated > i.created AND bi.connection_id = ? AND bi.board_id = ? ", data.Options.ConnectionId, data.Options.BoardId),
+ dal.Groupby("i.issue_id, i.updated"),
+ dal.Having("i.updated > max(rl.issue_updated) OR max(rl.issue_updated) IS NULL"),
+ }
+ // apply time range if any
+ since := data.Since
+ if since != nil {
+ clauses = append(clauses, dal.Where("i.updated > ?", *since))
+ }
+ cursor, err := db.Cursor(clauses...)
if err != nil {
logger.Error("collect remotelink error:%v", err)
return err
@@ -82,6 +75,7 @@ func CollectRemotelinks(taskCtx core.SubTaskContext) error {
},
ApiClient: data.ApiClient,
Input: iterator,
+ Incremental: since == nil,
UrlTemplate: "api/2/issue/{{ .Input.IssueId }}/remotelink",
ResponseParser: func(res *http.Response) ([]json.RawMessage, error) {
if res.StatusCode == http.StatusNotFound {
diff --git a/plugins/jira/tasks/remotelink_extractor.go b/plugins/jira/tasks/remotelink_extractor.go
index 52fb9ca0..b5188596 100644
--- a/plugins/jira/tasks/remotelink_extractor.go
+++ b/plugins/jira/tasks/remotelink_extractor.go
@@ -22,6 +22,7 @@ import (
"regexp"
"github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/apache/incubator-devlake/plugins/jira/models"
"github.com/apache/incubator-devlake/plugins/jira/tasks/apiv2models"
@@ -32,7 +33,7 @@ func ExtractRemotelinks(taskCtx core.SubTaskContext) error {
connectionId := data.Connection.ID
boardId := data.Options.BoardId
logger := taskCtx.GetLogger()
- db := taskCtx.GetDb()
+ db := taskCtx.GetDal()
logger.Info("extract remote links")
var commitShaRegex *regexp.Regexp
if pattern := data.Connection.RemotelinkCommitShaPattern; pattern != "" {
@@ -40,11 +41,13 @@ func ExtractRemotelinks(taskCtx core.SubTaskContext) error {
}
// select all remotelinks belongs to the board, cursor is important for low memory footprint
- cursor, err := db.Model(&models.JiraRemotelink{}).
- Select("_tool_jira_remotelinks.*").
- Joins("left join _tool_jira_board_issues on _tool_jira_board_issues.issue_id = _tool_jira_remotelinks.issue_id").
- Where("_tool_jira_board_issues.board_id = ? AND _tool_jira_board_issues.connection_id = ?", boardId, connectionId).
- Rows()
+ clauses := []dal.Clause{
+ dal.From(&models.JiraRemotelink{}),
+ dal.Select("*"),
+ dal.Join("left join _tool_jira_board_issues on _tool_jira_board_issues.issue_id = _tool_jira_remotelinks.issue_id"),
+ dal.Where("_tool_jira_board_issues.board_id = ? AND _tool_jira_board_issues.connection_id = ?", boardId, connectionId),
+ }
+ cursor, err := db.Cursor(clauses...)
if err != nil {
return err
}
@@ -71,11 +74,6 @@ func ExtractRemotelinks(taskCtx core.SubTaskContext) error {
if err != nil {
return nil, err
}
- issue := &models.JiraIssue{ConnectionId: connectionId, IssueId: input.IssueId}
- err = db.Model(issue).Update("remotelink_updated", input.UpdateTime).Error
- if err != nil {
- return nil, err
- }
remotelink := &models.JiraRemotelink{
ConnectionId: connectionId,
RemotelinkId: raw.ID,
@@ -83,6 +81,7 @@ func ExtractRemotelinks(taskCtx core.SubTaskContext) error {
Self: raw.Self,
Title: raw.Object.Title,
Url: raw.Object.URL,
+ IssueUpdated: &input.UpdateTime,
}
result = append(result, remotelink)
if commitShaRegex != nil {
diff --git a/plugins/jira/tasks/shared.go b/plugins/jira/tasks/shared.go
index 9f6dfc99..2de4a601 100644
--- a/plugins/jira/tasks/shared.go
+++ b/plugins/jira/tasks/shared.go
@@ -22,8 +22,6 @@ import (
"github.com/apache/incubator-devlake/models/domainlayer/ticket"
"github.com/apache/incubator-devlake/plugins/helper"
- "github.com/apache/incubator-devlake/plugins/jira/models"
- "gorm.io/gorm"
)
func GetTotalPagesFromResponse(res *http.Response, args *helper.ApiCollectorArgs) (int, error) {
@@ -39,7 +37,7 @@ func GetTotalPagesFromResponse(res *http.Response, args *helper.ApiCollectorArgs
return pages, nil
}
-func GetStdStatus(statusKey string) string {
+func getStdStatus(statusKey string) string {
if statusKey == "done" {
return ticket.DONE
} else if statusKey == "new" {
@@ -48,16 +46,3 @@ func GetStdStatus(statusKey string) string {
return ticket.IN_PROGRESS
}
}
-
-func GetStatusInfo(db *gorm.DB) (map[string]models.JiraStatus, error) {
- data := make([]models.JiraStatus, 0)
- err := db.Model(&models.JiraStatus{}).Scan(&data).Error
- if err != nil {
- return nil, err
- }
- statusMap := make(map[string]models.JiraStatus)
- for _, v := range data {
- statusMap[v.Name] = v
- }
- return statusMap, nil
-}
diff --git a/plugins/jira/tasks/worklog_collector.go b/plugins/jira/tasks/worklog_collector.go
index 54f172fb..be54b51f 100644
--- a/plugins/jira/tasks/worklog_collector.go
+++ b/plugins/jira/tasks/worklog_collector.go
@@ -23,7 +23,7 @@ import (
"reflect"
"github.com/apache/incubator-devlake/plugins/core"
- . "github.com/apache/incubator-devlake/plugins/core/dal"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/apache/incubator-devlake/plugins/jira/tasks/apiv2models"
)
@@ -38,21 +38,18 @@ func CollectWorklogs(taskCtx core.SubTaskContext) error {
logger := taskCtx.GetLogger()
// filter out issue_ids that needed collection
- clauses := []Clause{
- Select("bi.issue_id, NOW() AS update_time"),
- From("_tool_jira_board_issues bi"),
- Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)"),
- Where(
- `bi.connection_id = ?
- AND bi.board_id = ?
- AND (i.worklog_updated IS NULL OR i.worklog_updated < i.updated)`,
- data.Options.ConnectionId,
- data.Options.BoardId,
- ),
+ clauses := []dal.Clause{
+ dal.Select("i.issue_id, i.updated AS update_time"),
+ dal.From("_tool_jira_board_issues bi"),
+ dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)"),
+ dal.Join("LEFT JOIN _tool_jira_worklogs wl ON (wl.connection_id = i.connection_id AND wl.issue_id = i.issue_id)"),
+ dal.Where("i.updated > i.created AND bi.connection_id = ? AND bi.board_id = ? ", data.Options.ConnectionId, data.Options.BoardId),
+ dal.Groupby("i.issue_id, i.updated"),
+ dal.Having("i.updated > max(wl.issue_updated) OR max(wl.issue_updated) IS NULL"),
}
// apply time range if any
if since != nil {
- clauses = append(clauses, Where("i.updated > ?", *since))
+ clauses = append(clauses, dal.Where("i.updated > ?", *since))
}
// construct the input iterator
@@ -78,7 +75,7 @@ func CollectWorklogs(taskCtx core.SubTaskContext) error {
ApiClient: data.ApiClient,
UrlTemplate: "api/2/issue/{{ .Input.IssueId }}/worklog",
PageSize: 50,
- Incremental: true,
+ Incremental: since == nil,
GetTotalPages: GetTotalPagesFromResponse,
ResponseParser: func(res *http.Response) ([]json.RawMessage, error) {
var data struct {
diff --git a/plugins/jira/tasks/worklog_extractor.go b/plugins/jira/tasks/worklog_extractor.go
index 3a973109..93da0932 100644
--- a/plugins/jira/tasks/worklog_extractor.go
+++ b/plugins/jira/tasks/worklog_extractor.go
@@ -19,8 +19,6 @@ package tasks
import (
"encoding/json"
- "github.com/apache/incubator-devlake/plugins/jira/models"
-
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/apache/incubator-devlake/plugins/jira/tasks/apiv2models"
@@ -30,7 +28,6 @@ var _ core.SubTaskEntryPoint = ExtractWorklogs
func ExtractWorklogs(taskCtx core.SubTaskContext) error {
data := taskCtx.GetData().(*JiraTaskData)
- db := taskCtx.GetDb()
extractor, err := helper.NewApiExtractor(helper.ApiExtractorArgs{
RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
Ctx: taskCtx,
@@ -46,17 +43,12 @@ func ExtractWorklogs(taskCtx core.SubTaskContext) error {
if err != nil {
return nil, err
}
- issue := &models.JiraIssue{ConnectionId: data.Connection.ID, IssueId: input.IssueId}
- err = db.Model(issue).Update("worklog_updated", input.UpdateTime).Error
- if err != nil {
- return nil, err
- }
var worklog apiv2models.Worklog
err = json.Unmarshal(row.Data, &worklog)
if err != nil {
return nil, err
}
- return []interface{}{worklog.ToToolLayer(data.Connection.ID)}, nil
+ return []interface{}{worklog.ToToolLayer(data.Connection.ID, &input.UpdateTime)}, nil
},
})