You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by zh...@apache.org on 2022/06/15 09:28:23 UTC
[incubator-devlake] branch main updated: Nested Resource Incremental Collection (#2191)
This is an automated email from the ASF dual-hosted git repository.
zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 102d5f7a Nested Resource Incremental Collection (#2191)
102d5f7a is described below
commit 102d5f7a3f713e239e4d9f0207682cd4257e65b3
Author: Klesh Wong <zh...@merico.dev>
AuthorDate: Wed Jun 15 17:28:18 2022 +0800
Nested Resource Incremental Collection (#2191)
* fix: ignore '.so' files when loading plugins
* fix: missing mock for method `Nested`
* fix: worker errors was not caught
* fix: worker_scheduler wouldn't halt on error
* refactor: replace `GetDb` with `GetDal`
Closes #2183
* fix: unit test
* refactor: nested resource incremental collection
Closes #2189
* fix: older changelogs wouldn't be collected
* refactor: unify changelog.issue_updated values
---
impl/dalgorm/dalgorm.go | 4 ++
plugins/core/dal/dal.go | 16 +++++-
plugins/jira/models/changelog.go | 3 +-
plugins/jira/models/connection.go | 21 --------
plugins/jira/models/issue.go | 1 -
.../migrationscripts/updateSchemas20220615.go | 62 ++++++++++++++++++++++
plugins/jira/tasks/changelog_collector.go | 39 ++++++++------
plugins/jira/tasks/changelog_extractor.go | 14 ++---
plugins/jira/tasks/issue_extractor.go | 60 +++++++++++----------
plugins/jira/tasks/task_data.go | 15 ++++--
10 files changed, 156 insertions(+), 79 deletions(-)
diff --git a/impl/dalgorm/dalgorm.go b/impl/dalgorm/dalgorm.go
index b063f863..7f471982 100644
--- a/impl/dalgorm/dalgorm.go
+++ b/impl/dalgorm/dalgorm.go
@@ -61,6 +61,10 @@ func buildTx(tx *gorm.DB, clauses []dal.Clause) *gorm.DB {
}
case dal.SelectClause:
tx = tx.Select(d.(string))
+ case dal.GroupbyClause:
+ tx = tx.Group(d.(string))
+ case dal.HavingClause:
+ tx = tx.Having(d.(dal.DalClause).Expr, d.(dal.DalClause).Params...)
}
}
return tx
diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go
index 5f97df5a..bf8bfaea 100644
--- a/plugins/core/dal/dal.go
+++ b/plugins/core/dal/dal.go
@@ -109,7 +109,21 @@ func Select(fields string) Clause {
const OrderbyClause string = "OrderBy"
-// Orderby creates a new Orderby
+// Orderby creates a new Orderby clause
func Orderby(expr string) Clause {
return Clause{Type: OrderbyClause, Data: expr}
}
+
+const GroupbyClause string = "GroupBy"
+
+// Groupby creates a new Groupby clause
+func Groupby(expr string) Clause {
+ return Clause{Type: GroupbyClause, Data: expr}
+}
+
+const HavingClause string = "Having"
+
+// Groupby creates a new Groupby clause
+func Having(clause string, params ...interface{}) Clause {
+ return Clause{Type: HavingClause, Data: DalClause{clause, params}}
+}
diff --git a/plugins/jira/models/changelog.go b/plugins/jira/models/changelog.go
index 301197cd..bd965056 100644
--- a/plugins/jira/models/changelog.go
+++ b/plugins/jira/models/changelog.go
@@ -33,7 +33,8 @@ type JiraChangelog struct {
AuthorAccountId string `gorm:"type:varchar(255)"`
AuthorDisplayName string `gorm:"type:varchar(255)"`
AuthorActive bool
- Created time.Time `gorm:"index"`
+ Created time.Time `gorm:"index"`
+ IssueUpdated *time.Time `comment:"corresponding issue.updated time, changelog might need update IFF changelog.issue_updated < issue.updated"`
}
type JiraChangelogItem struct {
diff --git a/plugins/jira/models/connection.go b/plugins/jira/models/connection.go
index 1a4b7a14..fea93aaf 100644
--- a/plugins/jira/models/connection.go
+++ b/plugins/jira/models/connection.go
@@ -48,24 +48,3 @@ type JiraConnection struct {
func (JiraConnection) TableName() string {
return "_tool_jira_connections"
}
-
-type JiraIssueTypeMapping struct {
- ConnectionID uint64 `gorm:"primaryKey" json:"jiraConnectionId" validate:"required"`
- UserType string `gorm:"type:varchar(50);primaryKey" json:"userType" validate:"required"`
- StandardType string `gorm:"type:varchar(50)" json:"standardType" validate:"required"`
-}
-
-func (JiraIssueTypeMapping) TableName() string {
- return "_tool_jira_issue_type_mappings"
-}
-
-type JiraIssueStatusMapping struct {
- ConnectionID uint64 `gorm:"primaryKey" json:"jiraConnectionId" validate:"required"`
- UserType string `gorm:"type:varchar(50);primaryKey" json:"userType" validate:"required"`
- UserStatus string `gorm:"type:varchar(50);primaryKey" json:"userStatus" validate:"required"`
- StandardStatus string `gorm:"type:varchar(50)" json:"standardStatus" validate:"required"`
-}
-
-func (JiraIssueStatusMapping) TableName() string {
- return "_tool_jira_issue_status_mappings"
-}
diff --git a/plugins/jira/models/issue.go b/plugins/jira/models/issue.go
index a727099b..ae74e3ea 100644
--- a/plugins/jira/models/issue.go
+++ b/plugins/jira/models/issue.go
@@ -64,7 +64,6 @@ type JiraIssue struct {
AllFields datatypes.JSONMap
// internal status tracking
- ChangelogUpdated *time.Time
RemotelinkUpdated *time.Time
WorklogUpdated *time.Time
common.NoPKModel
diff --git a/plugins/jira/models/migrationscripts/updateSchemas20220615.go b/plugins/jira/models/migrationscripts/updateSchemas20220615.go
new file mode 100644
index 00000000..f374ede6
--- /dev/null
+++ b/plugins/jira/models/migrationscripts/updateSchemas20220615.go
@@ -0,0 +1,62 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migrationscripts
+
+import (
+ "context"
+ "time"
+
+ "gorm.io/gorm"
+)
+
+type UpdateSchemas20220615 struct {
+}
+
+type JiraIssue20220615 struct{}
+
+func (JiraIssue20220615) TableName() string {
+ return "_tool_jira_issues"
+}
+
+type JiraChangelog20220615 struct {
+ IssueUpdated *time.Time
+}
+
+func (JiraChangelog20220615) TableName() string {
+ return "_tool_jira_changelogs"
+}
+
+func (*UpdateSchemas20220615) Up(ctx context.Context, db *gorm.DB) error {
+ var err error
+ err = db.Migrator().DropColumn(&JiraIssue20220615{}, "changelog_updated")
+ if err != nil {
+ return err
+ }
+ err = db.Migrator().AutoMigrate(&JiraChangelog20220615{})
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (*UpdateSchemas20220615) Version() uint64 {
+ return 20220601154646
+}
+
+func (*UpdateSchemas20220615) Name() string {
+ return "replace issues.changelog_updated with changelogs.issue_updated"
+}
diff --git a/plugins/jira/tasks/changelog_collector.go b/plugins/jira/tasks/changelog_collector.go
index 22030f7d..ebbb9c44 100644
--- a/plugins/jira/tasks/changelog_collector.go
+++ b/plugins/jira/tasks/changelog_collector.go
@@ -25,7 +25,7 @@ import (
"reflect"
"github.com/apache/incubator-devlake/plugins/core"
- . "github.com/apache/incubator-devlake/plugins/core/dal"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/apache/incubator-devlake/plugins/jira/models"
"github.com/apache/incubator-devlake/plugins/jira/tasks/apiv2models"
@@ -40,26 +40,31 @@ func CollectChangelogs(taskCtx core.SubTaskContext) error {
if data.JiraServerInfo.DeploymentType == models.DeploymentServer {
return nil
}
+ log := taskCtx.GetLogger()
db := taskCtx.GetDal()
- // figure out the time range
- since := data.Since
- // filter out issue_ids that needed collection
- clauses := []Clause{
- Select("bi.issue_id, NOW() AS update_time"),
- From("_tool_jira_board_issues bi"),
- Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)"),
- Where(
- `bi.connection_id = ?
- AND bi.board_id = ?
- AND (i.changelog_updated IS NULL OR i.changelog_updated < i.updated)`,
- data.Options.ConnectionId,
- data.Options.BoardId,
- ),
+ // query for issue_ids that needed changelog collection
+ clauses := []dal.Clause{
+ dal.Select("i.issue_id, i.updated AS update_time"),
+ dal.From("_tool_jira_board_issues bi"),
+ dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)"),
+ dal.Join("LEFT JOIN _tool_jira_changelogs c ON (c.connection_id = i.connection_id AND c.issue_id = i.issue_id)"),
+ dal.Where(`i.updated > i.created AND bi.connection_id = ? AND bi.board_id = ? `, data.Options.ConnectionId, data.Options.BoardId),
+ dal.Groupby("i.issue_id, i.updated"),
+ dal.Having("i.updated > max(c.issue_updated) OR max(c.issue_updated) IS NULL"),
}
// apply time range if any
+ since := data.Since
if since != nil {
- clauses = append(clauses, Where("i.updated > ?", *since))
+ clauses = append(clauses, dal.Where("i.updated > ?", *since))
+ }
+
+ if log.IsLevelEnabled(core.LOG_DEBUG) {
+ count, err := db.Count(clauses...)
+ if err != nil {
+ return err
+ }
+ log.Debug("total number of issues to collect for: %d", count)
}
// construct the input iterator
@@ -85,7 +90,7 @@ func CollectChangelogs(taskCtx core.SubTaskContext) error {
},
ApiClient: data.ApiClient,
PageSize: 100,
- Incremental: true,
+ Incremental: since == nil,
GetTotalPages: GetTotalPagesFromResponse,
Input: iterator,
UrlTemplate: "api/3/issue/{{ .Input.IssueId }}/changelog",
diff --git a/plugins/jira/tasks/changelog_extractor.go b/plugins/jira/tasks/changelog_extractor.go
index d46097f2..435cf501 100644
--- a/plugins/jira/tasks/changelog_extractor.go
+++ b/plugins/jira/tasks/changelog_extractor.go
@@ -33,7 +33,6 @@ func ExtractChangelogs(taskCtx core.SubTaskContext) error {
if data.JiraServerInfo.DeploymentType == models.DeploymentServer {
return nil
}
- db := taskCtx.GetDb()
connectionId := data.Connection.ID
extractor, err := helper.NewApiExtractor(helper.ApiExtractorArgs{
RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
@@ -45,24 +44,25 @@ func ExtractChangelogs(taskCtx core.SubTaskContext) error {
Table: RAW_CHANGELOG_TABLE,
},
Extract: func(row *helper.RawData) ([]interface{}, error) {
+ // process input
var input apiv2models.Input
err := json.Unmarshal(row.Input, &input)
if err != nil {
return nil, err
}
- var result []interface{}
var changelog apiv2models.Changelog
err = json.Unmarshal(row.Data, &changelog)
if err != nil {
return nil, err
}
- issue := &models.JiraIssue{ConnectionId: connectionId, IssueId: input.IssueId}
- err = db.Model(issue).Update("changelog_updated", input.UpdateTime).Error
- if err != nil {
- return nil, err
- }
+ // prepare output
+ var result []interface{}
cl, user := changelog.ToToolLayer(connectionId, input.IssueId)
+ // this is crucial for incremental update
+ cl.IssueUpdated = &input.UpdateTime
+ // collect changelog / user inforation
result = append(result, cl, user)
+ // collect changelog_items
for _, item := range changelog.Items {
result = append(result, item.ToToolLayer(connectionId, changelog.ID))
for _, u := range item.ExtractUser(connectionId) {
diff --git a/plugins/jira/tasks/issue_extractor.go b/plugins/jira/tasks/issue_extractor.go
index f9b2a15d..3c2cee56 100644
--- a/plugins/jira/tasks/issue_extractor.go
+++ b/plugins/jira/tasks/issue_extractor.go
@@ -19,9 +19,11 @@ package tasks
import (
"encoding/json"
- "fmt"
+ "strconv"
"strings"
+ "time"
+ "github.com/apache/incubator-devlake/models/domainlayer/ticket"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/apache/incubator-devlake/plugins/jira/models"
@@ -34,18 +36,19 @@ func ExtractIssues(taskCtx core.SubTaskContext) error {
data := taskCtx.GetData().(*JiraTaskData)
connectionId := data.Connection.ID
boardId := data.Options.BoardId
- db := taskCtx.GetDb()
logger := taskCtx.GetLogger()
logger.Info("extract Issues, connection_id=%d, board_id=%d", connectionId, boardId)
// prepare getStdType function
- var typeMappingRows []*models.JiraIssueTypeMapping
- err := db.Find(&typeMappingRows, "connection_id = ?", connectionId).Error
- if err != nil {
- return err
- }
+ // TODO: implement type mapping
typeMappings := make(map[string]string)
- for _, typeMappingRow := range typeMappingRows {
- typeMappings[typeMappingRow.UserType] = typeMappingRow.StandardType
+ for _, userType := range data.Options.IssueExtraction.RequirementTypeMapping {
+ typeMappings[userType] = "REQUIREMENT"
+ }
+ for _, userType := range data.Options.IssueExtraction.BugTypeMapping {
+ typeMappings[userType] = "BUG"
+ }
+ for _, userType := range data.Options.IssueExtraction.IncidentTypeMapping {
+ typeMappings[userType] = "INCIDENT"
}
getStdType := func(userType string) string {
stdType := typeMappings[userType]
@@ -54,20 +57,14 @@ func ExtractIssues(taskCtx core.SubTaskContext) error {
}
return strings.ToUpper(stdType)
}
- // prepare getStdStatus function
- // TODO: status mapping is now not used
- var statusMappingRows []*models.JiraIssueStatusMapping
- err = db.Find(&statusMappingRows, "connection_id = ?", connectionId).Error
- if err != nil {
- return err
- }
- statusMappings := make(map[string]string)
- makeStatusMappingKey := func(userType string, userStatus string) string {
- return fmt.Sprintf("%v:%v", userType, userStatus)
- }
- for _, statusMappingRow := range statusMappingRows {
- k := makeStatusMappingKey(statusMappingRow.UserType, statusMappingRow.UserStatus)
- statusMappings[k] = statusMappingRow.StandardStatus
+ getStdStatus := func(statusKey string) string {
+ if statusKey == "done" {
+ return ticket.DONE
+ } else if statusKey == "new" {
+ return ticket.TODO
+ } else {
+ return ticket.IN_PROGRESS
+ }
}
extractor, err := helper.NewApiExtractor(helper.ApiExtractorArgs{
@@ -111,17 +108,26 @@ func ExtractIssues(taskCtx core.SubTaskContext) error {
if issue.ResolutionDate != nil {
issue.LeadTimeMinutes = uint(issue.ResolutionDate.Unix()-issue.Created.Unix()) / 60
}
+ if data.Options.IssueExtraction.StoryPointField != "" {
+ strStoryPoint := apiIssue.Fields.AllFields[data.Options.IssueExtraction.StoryPointField].(string)
+ issue.StoryPoint, _ = strconv.ParseFloat(strStoryPoint, 32)
+ }
issue.StdStoryPoint = uint(issue.StoryPoint)
issue.StdType = getStdType(issue.Type)
- issue.StdStatus = GetStdStatus(issue.StatusKey)
- if len(changelogs) < 100 {
- issue.ChangelogUpdated = &row.CreatedAt
- }
+ issue.StdStatus = getStdStatus(issue.StatusKey)
results = append(results, issue)
for _, worklog := range worklogs {
results = append(results, worklog)
}
+ var issueUpdated *time.Time
+ // likely this issue has more changelogs to be collected
+ if len(changelogs) == 100 {
+ issueUpdated = nil
+ } else {
+ issueUpdated = &issue.Updated
+ }
for _, changelog := range changelogs {
+ changelog.IssueUpdated = issueUpdated
results = append(results, changelog)
}
for _, changelogItem := range changelogItems {
diff --git a/plugins/jira/tasks/task_data.go b/plugins/jira/tasks/task_data.go
index 7b3d6b8c..23cc2957 100644
--- a/plugins/jira/tasks/task_data.go
+++ b/plugins/jira/tasks/task_data.go
@@ -25,10 +25,17 @@ import (
)
type JiraOptions struct {
- ConnectionId uint64 `json:"connectionId"`
- BoardId uint64 `json:"boardId"`
- Tasks []string `json:"tasks,omitempty"`
- Since string
+ ConnectionId uint64 `json:"connectionId"`
+ BoardId uint64 `json:"boardId"`
+ Tasks []string `json:"tasks,omitempty"`
+ Since string
+ IssueExtraction struct {
+ RequirementTypeMapping []string
+ BugTypeMapping []string
+ IncidentTypeMapping []string
+ //EpicKeyField string
+ StoryPointField string
+ }
}
type JiraTaskData struct {