You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by kl...@apache.org on 2022/06/21 05:50:46 UTC

[incubator-devlake] branch main updated: Refactor collector of worklogs and remotelinks (#2274)

This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 45ea58b4 Refactor collector of worklogs and remotelinks (#2274)
45ea58b4 is described below

commit 45ea58b4019d4f33698f2a3e37d2d375d6feb594
Author: mindlesscloud <li...@merico.dev>
AuthorDate: Tue Jun 21 13:50:42 2022 +0800

    Refactor collector of worklogs and remotelinks (#2274)
    
    * refactor: issue status transformation rules
    
    * refactor: worklogs and remotelinks collector
---
 plugins/jira/jira.go                               |  1 +
 plugins/jira/models/issue.go                       |  4 --
 .../migrationscripts/updateSchemas20220620.go      | 75 ++++++++++++++++++++++
 plugins/jira/models/remotelink.go                  |  2 +
 plugins/jira/models/worklog.go                     |  1 +
 plugins/jira/tasks/apiv2models/changelog.go        |  4 +-
 plugins/jira/tasks/apiv2models/issue.go            | 26 ++++----
 plugins/jira/tasks/apiv2models/worklog.go          |  4 +-
 plugins/jira/tasks/changelog_convertor.go          | 15 +----
 plugins/jira/tasks/changelog_extractor.go          |  2 +-
 plugins/jira/tasks/issue_extractor.go              | 24 ++-----
 plugins/jira/tasks/remotelink_collector.go         | 40 +++++-------
 plugins/jira/tasks/remotelink_extractor.go         | 21 +++---
 plugins/jira/tasks/shared.go                       | 17 +----
 plugins/jira/tasks/worklog_collector.go            | 25 ++++----
 plugins/jira/tasks/worklog_extractor.go            | 10 +--
 16 files changed, 148 insertions(+), 123 deletions(-)

diff --git a/plugins/jira/jira.go b/plugins/jira/jira.go
index 11f26281..97abb9d8 100644
--- a/plugins/jira/jira.go
+++ b/plugins/jira/jira.go
@@ -170,6 +170,7 @@ func (plugin Jira) MigrationScripts() []migration.Script {
 		new(migrationscripts.UpdateSchemas20220614),
 		new(migrationscripts.UpdateSchemas20220615),
 		new(migrationscripts.UpdateSchemas20220616),
+		new(migrationscripts.UpdateSchemas20220620),
 	}
 }
 
diff --git a/plugins/jira/models/issue.go b/plugins/jira/models/issue.go
index ae74e3ea..1a18a4db 100644
--- a/plugins/jira/models/issue.go
+++ b/plugins/jira/models/issue.go
@@ -62,10 +62,6 @@ type JiraIssue struct {
 	StdType                  string `gorm:"type:varchar(255)"`
 	StdStatus                string `gorm:"type:varchar(255)"`
 	AllFields                datatypes.JSONMap
-
-	// internal status tracking
-	RemotelinkUpdated *time.Time
-	WorklogUpdated    *time.Time
 	common.NoPKModel
 }
 
diff --git a/plugins/jira/models/migrationscripts/updateSchemas20220620.go b/plugins/jira/models/migrationscripts/updateSchemas20220620.go
new file mode 100644
index 00000000..70fc52fc
--- /dev/null
+++ b/plugins/jira/models/migrationscripts/updateSchemas20220620.go
@@ -0,0 +1,75 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migrationscripts
+
+import (
+	"context"
+	"gorm.io/gorm"
+	"time"
+)
+
+type UpdateSchemas20220620 struct {
+}
+
+type JiraIssue20220620 struct{}
+
+func (JiraIssue20220620) TableName() string {
+	return "_tool_jira_issues"
+}
+
+type JiraWorklog20220620 struct {
+	IssueUpdated *time.Time
+}
+
+func (JiraWorklog20220620) TableName() string {
+	return "_tool_jira_worklogs"
+}
+
+type JiraRemotelink20220620 struct {
+	IssueUpdated *time.Time
+}
+
+func (JiraRemotelink20220620) TableName() string {
+	return "_tool_jira_remotelinks"
+}
+
+func (*UpdateSchemas20220620) Up(ctx context.Context, db *gorm.DB) error {
+	var err error
+	err = db.Migrator().DropColumn(&JiraIssue20220620{}, "worklog_updated")
+	if err != nil {
+		return err
+	}
+	err = db.Migrator().DropColumn(&JiraIssue20220620{}, "remotelink_updated")
+	if err != nil {
+		return err
+	}
+	err = db.Migrator().AutoMigrate(&JiraWorklog20220620{}, &JiraRemotelink20220620{})
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (*UpdateSchemas20220620) Version() uint64 {
+	return 20220620101111
+}
+
+func (*UpdateSchemas20220620) Name() string {
+	return "add column issue_updated to _tool_jira_worklogs and _tool_jira_remotelinks"
+}
diff --git a/plugins/jira/models/remotelink.go b/plugins/jira/models/remotelink.go
index de624929..f543b5b3 100644
--- a/plugins/jira/models/remotelink.go
+++ b/plugins/jira/models/remotelink.go
@@ -20,6 +20,7 @@ package models
 import (
 	"github.com/apache/incubator-devlake/models/common"
 	"gorm.io/datatypes"
+	"time"
 )
 
 type JiraRemotelink struct {
@@ -33,6 +34,7 @@ type JiraRemotelink struct {
 	Self         string `gorm:"type:varchar(255)"`
 	Title        string
 	Url          string `gorm:"type:varchar(255)"`
+	IssueUpdated *time.Time
 }
 
 func (JiraRemotelink) TableName() string {
diff --git a/plugins/jira/models/worklog.go b/plugins/jira/models/worklog.go
index 7ecafae8..e340013d 100644
--- a/plugins/jira/models/worklog.go
+++ b/plugins/jira/models/worklog.go
@@ -34,6 +34,7 @@ type JiraWorklog struct {
 	TimeSpentSeconds int
 	Updated          time.Time
 	Started          time.Time
+	IssueUpdated     *time.Time
 }
 
 func (JiraWorklog) TableName() string {
diff --git a/plugins/jira/tasks/apiv2models/changelog.go b/plugins/jira/tasks/apiv2models/changelog.go
index bfe66897..3d12212a 100644
--- a/plugins/jira/tasks/apiv2models/changelog.go
+++ b/plugins/jira/tasks/apiv2models/changelog.go
@@ -20,6 +20,7 @@ package apiv2models
 import (
 	"github.com/apache/incubator-devlake/plugins/helper"
 	"github.com/apache/incubator-devlake/plugins/jira/models"
+	"time"
 )
 
 type Changelog struct {
@@ -29,7 +30,7 @@ type Changelog struct {
 	Items   []ChangelogItem    `json:"items"`
 }
 
-func (c Changelog) ToToolLayer(connectionId, issueId uint64) (*models.JiraChangelog, *models.JiraUser) {
+func (c Changelog) ToToolLayer(connectionId, issueId uint64, issueUpdated *time.Time) (*models.JiraChangelog, *models.JiraUser) {
 	return &models.JiraChangelog{
 		ConnectionId:      connectionId,
 		ChangelogId:       c.ID,
@@ -38,6 +39,7 @@ func (c Changelog) ToToolLayer(connectionId, issueId uint64) (*models.JiraChange
 		AuthorDisplayName: c.Author.DisplayName,
 		AuthorActive:      c.Author.Active,
 		Created:           c.Created.ToTime(),
+		IssueUpdated:      issueUpdated,
 	}, c.Author.ToToolLayer(connectionId)
 }
 
diff --git a/plugins/jira/tasks/apiv2models/issue.go b/plugins/jira/tasks/apiv2models/issue.go
index a110bbd6..52dd60c6 100644
--- a/plugins/jira/tasks/apiv2models/issue.go
+++ b/plugins/jira/tasks/apiv2models/issue.go
@@ -19,8 +19,8 @@ package apiv2models
 
 import (
 	"encoding/json"
-
 	"gorm.io/datatypes"
+	"time"
 
 	"github.com/apache/incubator-devlake/plugins/helper"
 	"github.com/apache/incubator-devlake/plugins/jira/models"
@@ -224,26 +224,30 @@ func (i *Issue) SetAllFields(raw datatypes.JSON) error {
 	return nil
 }
 
-func (i Issue) ExtractEntities(connectionId uint64) ([]uint64, *models.JiraIssue, bool, []*models.JiraWorklog, []*models.JiraChangelog, []*models.JiraChangelogItem, []*models.JiraUser) {
+func (i Issue) ExtractEntities(connectionId uint64) ([]uint64, *models.JiraIssue, []*models.JiraWorklog, []*models.JiraChangelog, []*models.JiraChangelogItem, []*models.JiraUser) {
 	issue := i.toToolLayer(connectionId)
 	var worklogs []*models.JiraWorklog
 	var changelogs []*models.JiraChangelog
 	var changelogItems []*models.JiraChangelogItem
 	var users []*models.JiraUser
-	var needCollectWorklog bool
 	var sprints []uint64
+
 	if i.Fields.Worklog != nil {
-		if i.Fields.Worklog.Total > len(i.Fields.Worklog.Worklogs) {
-			needCollectWorklog = true
-		} else {
-			for _, w := range i.Fields.Worklog.Worklogs {
-				worklogs = append(worklogs, w.ToToolLayer(connectionId))
-			}
+		var issueUpdated *time.Time
+		if len(i.Fields.Worklog.Worklogs) <= i.Fields.Worklog.Total {
+			issueUpdated = i.Fields.Updated.ToNullableTime()
+		}
+		for _, w := range i.Fields.Worklog.Worklogs {
+			worklogs = append(worklogs, w.ToToolLayer(connectionId, issueUpdated))
 		}
 	}
 	if i.Changelog != nil {
+		var issueUpdated *time.Time
+		if len(i.Changelog.Histories) < 100 {
+			issueUpdated = i.Fields.Updated.ToNullableTime()
+		}
 		for _, changelog := range i.Changelog.Histories {
-			cl, user := changelog.ToToolLayer(connectionId, i.ID)
+			cl, user := changelog.ToToolLayer(connectionId, i.ID, issueUpdated)
 			changelogs = append(changelogs, cl)
 			users = append(users, user)
 			for _, item := range changelog.Items {
@@ -262,5 +266,5 @@ func (i Issue) ExtractEntities(connectionId uint64) ([]uint64, *models.JiraIssue
 	if i.Fields.Assignee != nil {
 		users = append(users, i.Fields.Assignee.ToToolLayer(connectionId))
 	}
-	return sprints, issue, needCollectWorklog, worklogs, changelogs, changelogItems, users
+	return sprints, issue, worklogs, changelogs, changelogItems, users
 }
diff --git a/plugins/jira/tasks/apiv2models/worklog.go b/plugins/jira/tasks/apiv2models/worklog.go
index 8ac25d24..db8cddac 100644
--- a/plugins/jira/tasks/apiv2models/worklog.go
+++ b/plugins/jira/tasks/apiv2models/worklog.go
@@ -19,6 +19,7 @@ package apiv2models
 
 import (
 	"github.com/apache/incubator-devlake/plugins/helper"
+	"time"
 
 	"github.com/apache/incubator-devlake/plugins/jira/models"
 )
@@ -37,7 +38,7 @@ type Worklog struct {
 	IssueID          uint64             `json:"issueId,string"`
 }
 
-func (w Worklog) ToToolLayer(connectionId uint64) *models.JiraWorklog {
+func (w Worklog) ToToolLayer(connectionId uint64, issueUpdated *time.Time) *models.JiraWorklog {
 	result := &models.JiraWorklog{
 		ConnectionId:     connectionId,
 		IssueId:          w.IssueID,
@@ -46,6 +47,7 @@ func (w Worklog) ToToolLayer(connectionId uint64) *models.JiraWorklog {
 		TimeSpentSeconds: w.TimeSpentSeconds,
 		Updated:          w.Updated.ToTime(),
 		Started:          w.Started.ToTime(),
+		IssueUpdated:     issueUpdated,
 	}
 	if w.Author != nil {
 		result.AuthorId = w.Author.EmailAddress
diff --git a/plugins/jira/tasks/changelog_convertor.go b/plugins/jira/tasks/changelog_convertor.go
index b3e08c6c..c1458e76 100644
--- a/plugins/jira/tasks/changelog_convertor.go
+++ b/plugins/jira/tasks/changelog_convertor.go
@@ -47,11 +47,6 @@ func ConvertChangelogs(taskCtx core.SubTaskContext) error {
 	logger := taskCtx.GetLogger()
 	db := taskCtx.GetDal()
 	logger.Info("covert changelog")
-	statusMap, err := GetStatusInfo(taskCtx.GetDb())
-	if err != nil {
-		logger.Error(err.Error())
-		return err
-	}
 	// select all changelogs belongs to the board
 	clauses := []dal.Clause{
 		dal.Select("_tool_jira_changelog_items.*, _tool_jira_changelogs.issue_id, author_account_id, author_display_name, created"),
@@ -123,14 +118,8 @@ func ConvertChangelogs(taskCtx core.SubTaskContext) error {
 				}
 			}
 			if row.Field == "status" {
-				fromStatus, ok := statusMap[row.FromString]
-				if ok {
-					changelog.FromValue = GetStdStatus(fromStatus.StatusCategory)
-				}
-				toStatus, ok := statusMap[row.ToString]
-				if ok {
-					changelog.ToValue = GetStdStatus(toStatus.StatusCategory)
-				}
+				changelog.FromValue = getStdStatus(row.FromString)
+				changelog.ToValue = getStdStatus(row.ToString)
 			}
 			return []interface{}{changelog}, nil
 		},
diff --git a/plugins/jira/tasks/changelog_extractor.go b/plugins/jira/tasks/changelog_extractor.go
index 435cf501..211744f2 100644
--- a/plugins/jira/tasks/changelog_extractor.go
+++ b/plugins/jira/tasks/changelog_extractor.go
@@ -57,7 +57,7 @@ func ExtractChangelogs(taskCtx core.SubTaskContext) error {
 			}
 			// prepare output
 			var result []interface{}
-			cl, user := changelog.ToToolLayer(connectionId, input.IssueId)
+			cl, user := changelog.ToToolLayer(connectionId, input.IssueId, &input.UpdateTime)
 			// this is crucial for incremental update
 			cl.IssueUpdated = &input.UpdateTime
 			// collect changelog / user inforation
diff --git a/plugins/jira/tasks/issue_extractor.go b/plugins/jira/tasks/issue_extractor.go
index 431d6131..63e828af 100644
--- a/plugins/jira/tasks/issue_extractor.go
+++ b/plugins/jira/tasks/issue_extractor.go
@@ -23,7 +23,6 @@ import (
 	"strings"
 	"time"
 
-	"github.com/apache/incubator-devlake/models/domainlayer/ticket"
 	"github.com/apache/incubator-devlake/plugins/core"
 	"github.com/apache/incubator-devlake/plugins/helper"
 	"github.com/apache/incubator-devlake/plugins/jira/models"
@@ -50,22 +49,6 @@ func ExtractIssues(taskCtx core.SubTaskContext) error {
 	for _, userType := range data.Options.IssueExtraction.IncidentTypeMapping {
 		typeMappings[userType] = "INCIDENT"
 	}
-	getStdType := func(userType string) string {
-		stdType := typeMappings[userType]
-		if stdType == "" {
-			return strings.ToUpper(userType)
-		}
-		return strings.ToUpper(stdType)
-	}
-	getStdStatus := func(statusKey string) string {
-		if statusKey == "done" {
-			return ticket.DONE
-		} else if statusKey == "new" {
-			return ticket.TODO
-		} else {
-			return ticket.IN_PROGRESS
-		}
-	}
 
 	extractor, err := helper.NewApiExtractor(helper.ApiExtractorArgs{
 		RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
@@ -94,7 +77,7 @@ func ExtractIssues(taskCtx core.SubTaskContext) error {
 				return nil, err
 			}
 			var results []interface{}
-			sprints, issue, _, worklogs, changelogs, changelogItems, users := apiIssue.ExtractEntities(data.Connection.ID)
+			sprints, issue, worklogs, changelogs, changelogItems, users := apiIssue.ExtractEntities(data.Connection.ID)
 			for _, sprintId := range sprints {
 				sprintIssue := &models.JiraSprintIssue{
 					ConnectionId:     data.Connection.ID,
@@ -113,7 +96,10 @@ func ExtractIssues(taskCtx core.SubTaskContext) error {
 				issue.StoryPoint, _ = strconv.ParseFloat(strStoryPoint, 32)
 			}
 			issue.StdStoryPoint = uint(issue.StoryPoint)
-			issue.StdType = getStdType(issue.Type)
+			issue.StdType = typeMappings[issue.Type]
+			if issue.StdType == "" {
+				issue.StdType = strings.ToUpper(issue.Type)
+			}
 			issue.StdStatus = getStdStatus(issue.StatusKey)
 			results = append(results, issue)
 			for _, worklog := range worklogs {
diff --git a/plugins/jira/tasks/remotelink_collector.go b/plugins/jira/tasks/remotelink_collector.go
index f1e2c7bf..0e6d55f5 100644
--- a/plugins/jira/tasks/remotelink_collector.go
+++ b/plugins/jira/tasks/remotelink_collector.go
@@ -23,7 +23,7 @@ import (
 	"reflect"
 
 	"github.com/apache/incubator-devlake/plugins/core"
-	. "github.com/apache/incubator-devlake/plugins/core/dal"
+	"github.com/apache/incubator-devlake/plugins/core/dal"
 	"github.com/apache/incubator-devlake/plugins/helper"
 	"github.com/apache/incubator-devlake/plugins/jira/tasks/apiv2models"
 )
@@ -38,28 +38,21 @@ func CollectRemotelinks(taskCtx core.SubTaskContext) error {
 	logger := taskCtx.GetLogger()
 	logger.Info("collect remotelink")
 
-	/*
-		`CollectIssues` will take into account of `since` option and set the `updated` field for issues that have
-		updates, So when it comes to collecting remotelinks, we only need to compare an issue's `updated` field with its
-		`remotelink_updated` field. If `remotelink_updated` is older, then we'll collect remotelinks for this issue and
-		set its `remotelink_updated` to `updated` at the end.
-	*/
-	cursor, err := db.Cursor(
-		Select("i.issue_id, NOW() AS update_time"),
-		From("_tool_jira_remotelinks i"),
-		Join(`LEFT JOIN _tool_jira_board_issues bi ON (
-			bi.connection_id = i.connection_id AND
-			bi.issue_id = i.issue_id
-		)`),
-		Where(`
-			bi.connection_id = ? AND
-			bi.board_id = ? AND
-			(i.remotelink_updated IS NULL OR i.remotelink_updated < i.updated)
-			`,
-			data.Options.ConnectionId,
-			data.Options.BoardId,
-		),
-	)
+	clauses := []dal.Clause{
+		dal.Select("i.issue_id, i.updated AS update_time"),
+		dal.From("_tool_jira_board_issues bi"),
+		dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)"),
+		dal.Join("LEFT JOIN _tool_jira_remotelinks rl ON (rl.connection_id = i.connection_id AND rl.issue_id = i.issue_id)"),
+		dal.Where("i.updated > i.created AND bi.connection_id = ?  AND bi.board_id = ?  ", data.Options.ConnectionId, data.Options.BoardId),
+		dal.Groupby("i.issue_id, i.updated"),
+		dal.Having("i.updated > max(rl.issue_updated) OR  max(rl.issue_updated) IS NULL"),
+	}
+	// apply time range if any
+	since := data.Since
+	if since != nil {
+		clauses = append(clauses, dal.Where("i.updated > ?", *since))
+	}
+	cursor, err := db.Cursor(clauses...)
 	if err != nil {
 		logger.Error("collect remotelink error:%v", err)
 		return err
@@ -82,6 +75,7 @@ func CollectRemotelinks(taskCtx core.SubTaskContext) error {
 		},
 		ApiClient:   data.ApiClient,
 		Input:       iterator,
+		Incremental: since == nil,
 		UrlTemplate: "api/2/issue/{{ .Input.IssueId }}/remotelink",
 		ResponseParser: func(res *http.Response) ([]json.RawMessage, error) {
 			if res.StatusCode == http.StatusNotFound {
diff --git a/plugins/jira/tasks/remotelink_extractor.go b/plugins/jira/tasks/remotelink_extractor.go
index 52fb9ca0..b5188596 100644
--- a/plugins/jira/tasks/remotelink_extractor.go
+++ b/plugins/jira/tasks/remotelink_extractor.go
@@ -22,6 +22,7 @@ import (
 	"regexp"
 
 	"github.com/apache/incubator-devlake/plugins/core"
+	"github.com/apache/incubator-devlake/plugins/core/dal"
 	"github.com/apache/incubator-devlake/plugins/helper"
 	"github.com/apache/incubator-devlake/plugins/jira/models"
 	"github.com/apache/incubator-devlake/plugins/jira/tasks/apiv2models"
@@ -32,7 +33,7 @@ func ExtractRemotelinks(taskCtx core.SubTaskContext) error {
 	connectionId := data.Connection.ID
 	boardId := data.Options.BoardId
 	logger := taskCtx.GetLogger()
-	db := taskCtx.GetDb()
+	db := taskCtx.GetDal()
 	logger.Info("extract remote links")
 	var commitShaRegex *regexp.Regexp
 	if pattern := data.Connection.RemotelinkCommitShaPattern; pattern != "" {
@@ -40,11 +41,13 @@ func ExtractRemotelinks(taskCtx core.SubTaskContext) error {
 	}
 
 	// select all remotelinks belongs to the board, cursor is important for low memory footprint
-	cursor, err := db.Model(&models.JiraRemotelink{}).
-		Select("_tool_jira_remotelinks.*").
-		Joins("left join _tool_jira_board_issues on _tool_jira_board_issues.issue_id = _tool_jira_remotelinks.issue_id").
-		Where("_tool_jira_board_issues.board_id = ? AND _tool_jira_board_issues.connection_id = ?", boardId, connectionId).
-		Rows()
+	clauses := []dal.Clause{
+		dal.From(&models.JiraRemotelink{}),
+		dal.Select("*"),
+		dal.Join("left join _tool_jira_board_issues on _tool_jira_board_issues.issue_id = _tool_jira_remotelinks.issue_id"),
+		dal.Where("_tool_jira_board_issues.board_id = ? AND _tool_jira_board_issues.connection_id = ?", boardId, connectionId),
+	}
+	cursor, err := db.Cursor(clauses...)
 	if err != nil {
 		return err
 	}
@@ -71,11 +74,6 @@ func ExtractRemotelinks(taskCtx core.SubTaskContext) error {
 			if err != nil {
 				return nil, err
 			}
-			issue := &models.JiraIssue{ConnectionId: connectionId, IssueId: input.IssueId}
-			err = db.Model(issue).Update("remotelink_updated", input.UpdateTime).Error
-			if err != nil {
-				return nil, err
-			}
 			remotelink := &models.JiraRemotelink{
 				ConnectionId: connectionId,
 				RemotelinkId: raw.ID,
@@ -83,6 +81,7 @@ func ExtractRemotelinks(taskCtx core.SubTaskContext) error {
 				Self:         raw.Self,
 				Title:        raw.Object.Title,
 				Url:          raw.Object.URL,
+				IssueUpdated: &input.UpdateTime,
 			}
 			result = append(result, remotelink)
 			if commitShaRegex != nil {
diff --git a/plugins/jira/tasks/shared.go b/plugins/jira/tasks/shared.go
index 9f6dfc99..2de4a601 100644
--- a/plugins/jira/tasks/shared.go
+++ b/plugins/jira/tasks/shared.go
@@ -22,8 +22,6 @@ import (
 
 	"github.com/apache/incubator-devlake/models/domainlayer/ticket"
 	"github.com/apache/incubator-devlake/plugins/helper"
-	"github.com/apache/incubator-devlake/plugins/jira/models"
-	"gorm.io/gorm"
 )
 
 func GetTotalPagesFromResponse(res *http.Response, args *helper.ApiCollectorArgs) (int, error) {
@@ -39,7 +37,7 @@ func GetTotalPagesFromResponse(res *http.Response, args *helper.ApiCollectorArgs
 	return pages, nil
 }
 
-func GetStdStatus(statusKey string) string {
+func getStdStatus(statusKey string) string {
 	if statusKey == "done" {
 		return ticket.DONE
 	} else if statusKey == "new" {
@@ -48,16 +46,3 @@ func GetStdStatus(statusKey string) string {
 		return ticket.IN_PROGRESS
 	}
 }
-
-func GetStatusInfo(db *gorm.DB) (map[string]models.JiraStatus, error) {
-	data := make([]models.JiraStatus, 0)
-	err := db.Model(&models.JiraStatus{}).Scan(&data).Error
-	if err != nil {
-		return nil, err
-	}
-	statusMap := make(map[string]models.JiraStatus)
-	for _, v := range data {
-		statusMap[v.Name] = v
-	}
-	return statusMap, nil
-}
diff --git a/plugins/jira/tasks/worklog_collector.go b/plugins/jira/tasks/worklog_collector.go
index 54f172fb..be54b51f 100644
--- a/plugins/jira/tasks/worklog_collector.go
+++ b/plugins/jira/tasks/worklog_collector.go
@@ -23,7 +23,7 @@ import (
 	"reflect"
 
 	"github.com/apache/incubator-devlake/plugins/core"
-	. "github.com/apache/incubator-devlake/plugins/core/dal"
+	"github.com/apache/incubator-devlake/plugins/core/dal"
 	"github.com/apache/incubator-devlake/plugins/helper"
 	"github.com/apache/incubator-devlake/plugins/jira/tasks/apiv2models"
 )
@@ -38,21 +38,18 @@ func CollectWorklogs(taskCtx core.SubTaskContext) error {
 	logger := taskCtx.GetLogger()
 
 	// filter out issue_ids that needed collection
-	clauses := []Clause{
-		Select("bi.issue_id, NOW() AS update_time"),
-		From("_tool_jira_board_issues bi"),
-		Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)"),
-		Where(
-			`bi.connection_id = ?
-			   AND bi.board_id = ?
-			   AND (i.worklog_updated IS NULL OR i.worklog_updated < i.updated)`,
-			data.Options.ConnectionId,
-			data.Options.BoardId,
-		),
+	clauses := []dal.Clause{
+		dal.Select("i.issue_id, i.updated AS update_time"),
+		dal.From("_tool_jira_board_issues bi"),
+		dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)"),
+		dal.Join("LEFT JOIN _tool_jira_worklogs wl ON (wl.connection_id = i.connection_id AND wl.issue_id = i.issue_id)"),
+		dal.Where("i.updated > i.created AND bi.connection_id = ?  AND bi.board_id = ?  ", data.Options.ConnectionId, data.Options.BoardId),
+		dal.Groupby("i.issue_id, i.updated"),
+		dal.Having("i.updated > max(wl.issue_updated) OR  max(wl.issue_updated) IS NULL"),
 	}
 	// apply time range if any
 	if since != nil {
-		clauses = append(clauses, Where("i.updated > ?", *since))
+		clauses = append(clauses, dal.Where("i.updated > ?", *since))
 	}
 
 	// construct the input iterator
@@ -78,7 +75,7 @@ func CollectWorklogs(taskCtx core.SubTaskContext) error {
 		ApiClient:     data.ApiClient,
 		UrlTemplate:   "api/2/issue/{{ .Input.IssueId }}/worklog",
 		PageSize:      50,
-		Incremental:   true,
+		Incremental:   since == nil,
 		GetTotalPages: GetTotalPagesFromResponse,
 		ResponseParser: func(res *http.Response) ([]json.RawMessage, error) {
 			var data struct {
diff --git a/plugins/jira/tasks/worklog_extractor.go b/plugins/jira/tasks/worklog_extractor.go
index 3a973109..93da0932 100644
--- a/plugins/jira/tasks/worklog_extractor.go
+++ b/plugins/jira/tasks/worklog_extractor.go
@@ -19,8 +19,6 @@ package tasks
 
 import (
 	"encoding/json"
-	"github.com/apache/incubator-devlake/plugins/jira/models"
-
 	"github.com/apache/incubator-devlake/plugins/core"
 	"github.com/apache/incubator-devlake/plugins/helper"
 	"github.com/apache/incubator-devlake/plugins/jira/tasks/apiv2models"
@@ -30,7 +28,6 @@ var _ core.SubTaskEntryPoint = ExtractWorklogs
 
 func ExtractWorklogs(taskCtx core.SubTaskContext) error {
 	data := taskCtx.GetData().(*JiraTaskData)
-	db := taskCtx.GetDb()
 	extractor, err := helper.NewApiExtractor(helper.ApiExtractorArgs{
 		RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
 			Ctx: taskCtx,
@@ -46,17 +43,12 @@ func ExtractWorklogs(taskCtx core.SubTaskContext) error {
 			if err != nil {
 				return nil, err
 			}
-			issue := &models.JiraIssue{ConnectionId: data.Connection.ID, IssueId: input.IssueId}
-			err = db.Model(issue).Update("worklog_updated", input.UpdateTime).Error
-			if err != nil {
-				return nil, err
-			}
 			var worklog apiv2models.Worklog
 			err = json.Unmarshal(row.Data, &worklog)
 			if err != nil {
 				return nil, err
 			}
-			return []interface{}{worklog.ToToolLayer(data.Connection.ID)}, nil
+			return []interface{}{worklog.ToToolLayer(data.Connection.ID, &input.UpdateTime)}, nil
 		},
 	})