You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by zh...@apache.org on 2022/06/15 09:28:23 UTC

[incubator-devlake] branch main updated: Nested Resource Incremental Collection (#2191)

This is an automated email from the ASF dual-hosted git repository.

zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 102d5f7a Nested Resource Incremental Collection (#2191)
102d5f7a is described below

commit 102d5f7a3f713e239e4d9f0207682cd4257e65b3
Author: Klesh Wong <zh...@merico.dev>
AuthorDate: Wed Jun 15 17:28:18 2022 +0800

    Nested Resource Incremental Collection (#2191)
    
    * fix: ignore '.so' files when loading plugins
    
    * fix: missing mock for method `Nested`
    
    * fix: worker errors was not caught
    
    * fix: worker_scheduler wouldn't halt on error
    
    * refactor: replace `GetDb` with `GetDal`
    
      Closes #2183
    
    * fix: unit test
    
    * refactor: nested resource incremental collection
    
      Closes #2189
    
    * fix: older changelogs wouldn't be collected
    
    * refactor: unify changelog.issue_updated values
---
 impl/dalgorm/dalgorm.go                            |  4 ++
 plugins/core/dal/dal.go                            | 16 +++++-
 plugins/jira/models/changelog.go                   |  3 +-
 plugins/jira/models/connection.go                  | 21 --------
 plugins/jira/models/issue.go                       |  1 -
 .../migrationscripts/updateSchemas20220615.go      | 62 ++++++++++++++++++++++
 plugins/jira/tasks/changelog_collector.go          | 39 ++++++++------
 plugins/jira/tasks/changelog_extractor.go          | 14 ++---
 plugins/jira/tasks/issue_extractor.go              | 60 +++++++++++----------
 plugins/jira/tasks/task_data.go                    | 15 ++++--
 10 files changed, 156 insertions(+), 79 deletions(-)

diff --git a/impl/dalgorm/dalgorm.go b/impl/dalgorm/dalgorm.go
index b063f863..7f471982 100644
--- a/impl/dalgorm/dalgorm.go
+++ b/impl/dalgorm/dalgorm.go
@@ -61,6 +61,10 @@ func buildTx(tx *gorm.DB, clauses []dal.Clause) *gorm.DB {
 			}
 		case dal.SelectClause:
 			tx = tx.Select(d.(string))
+		case dal.GroupbyClause:
+			tx = tx.Group(d.(string))
+		case dal.HavingClause:
+			tx = tx.Having(d.(dal.DalClause).Expr, d.(dal.DalClause).Params...)
 		}
 	}
 	return tx
diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go
index 5f97df5a..bf8bfaea 100644
--- a/plugins/core/dal/dal.go
+++ b/plugins/core/dal/dal.go
@@ -109,7 +109,21 @@ func Select(fields string) Clause {
 
 const OrderbyClause string = "OrderBy"
 
-// Orderby creates a new Orderby
+// Orderby creates a new Orderby clause
 func Orderby(expr string) Clause {
 	return Clause{Type: OrderbyClause, Data: expr}
 }
+
+const GroupbyClause string = "GroupBy"
+
+// Groupby creates a new Groupby clause
+func Groupby(expr string) Clause {
+	return Clause{Type: GroupbyClause, Data: expr}
+}
+
+const HavingClause string = "Having"
+
+// Groupby creates a new Groupby clause
+func Having(clause string, params ...interface{}) Clause {
+	return Clause{Type: HavingClause, Data: DalClause{clause, params}}
+}
diff --git a/plugins/jira/models/changelog.go b/plugins/jira/models/changelog.go
index 301197cd..bd965056 100644
--- a/plugins/jira/models/changelog.go
+++ b/plugins/jira/models/changelog.go
@@ -33,7 +33,8 @@ type JiraChangelog struct {
 	AuthorAccountId   string `gorm:"type:varchar(255)"`
 	AuthorDisplayName string `gorm:"type:varchar(255)"`
 	AuthorActive      bool
-	Created           time.Time `gorm:"index"`
+	Created           time.Time  `gorm:"index"`
+	IssueUpdated      *time.Time `comment:"corresponding issue.updated time, changelog might need update IFF changelog.issue_updated < issue.updated"`
 }
 
 type JiraChangelogItem struct {
diff --git a/plugins/jira/models/connection.go b/plugins/jira/models/connection.go
index 1a4b7a14..fea93aaf 100644
--- a/plugins/jira/models/connection.go
+++ b/plugins/jira/models/connection.go
@@ -48,24 +48,3 @@ type JiraConnection struct {
 func (JiraConnection) TableName() string {
 	return "_tool_jira_connections"
 }
-
-type JiraIssueTypeMapping struct {
-	ConnectionID uint64 `gorm:"primaryKey" json:"jiraConnectionId" validate:"required"`
-	UserType     string `gorm:"type:varchar(50);primaryKey" json:"userType" validate:"required"`
-	StandardType string `gorm:"type:varchar(50)" json:"standardType" validate:"required"`
-}
-
-func (JiraIssueTypeMapping) TableName() string {
-	return "_tool_jira_issue_type_mappings"
-}
-
-type JiraIssueStatusMapping struct {
-	ConnectionID   uint64 `gorm:"primaryKey" json:"jiraConnectionId" validate:"required"`
-	UserType       string `gorm:"type:varchar(50);primaryKey" json:"userType" validate:"required"`
-	UserStatus     string `gorm:"type:varchar(50);primaryKey" json:"userStatus" validate:"required"`
-	StandardStatus string `gorm:"type:varchar(50)" json:"standardStatus" validate:"required"`
-}
-
-func (JiraIssueStatusMapping) TableName() string {
-	return "_tool_jira_issue_status_mappings"
-}
diff --git a/plugins/jira/models/issue.go b/plugins/jira/models/issue.go
index a727099b..ae74e3ea 100644
--- a/plugins/jira/models/issue.go
+++ b/plugins/jira/models/issue.go
@@ -64,7 +64,6 @@ type JiraIssue struct {
 	AllFields                datatypes.JSONMap
 
 	// internal status tracking
-	ChangelogUpdated  *time.Time
 	RemotelinkUpdated *time.Time
 	WorklogUpdated    *time.Time
 	common.NoPKModel
diff --git a/plugins/jira/models/migrationscripts/updateSchemas20220615.go b/plugins/jira/models/migrationscripts/updateSchemas20220615.go
new file mode 100644
index 00000000..f374ede6
--- /dev/null
+++ b/plugins/jira/models/migrationscripts/updateSchemas20220615.go
@@ -0,0 +1,62 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+    http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migrationscripts
+
+import (
+	"context"
+	"time"
+
+	"gorm.io/gorm"
+)
+
+type UpdateSchemas20220615 struct {
+}
+
+type JiraIssue20220615 struct{}
+
+func (JiraIssue20220615) TableName() string {
+	return "_tool_jira_issues"
+}
+
+type JiraChangelog20220615 struct {
+	IssueUpdated *time.Time
+}
+
+func (JiraChangelog20220615) TableName() string {
+	return "_tool_jira_changelogs"
+}
+
+func (*UpdateSchemas20220615) Up(ctx context.Context, db *gorm.DB) error {
+	var err error
+	err = db.Migrator().DropColumn(&JiraIssue20220615{}, "changelog_updated")
+	if err != nil {
+		return err
+	}
+	err = db.Migrator().AutoMigrate(&JiraChangelog20220615{})
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (*UpdateSchemas20220615) Version() uint64 {
+	return 20220601154646
+}
+
+func (*UpdateSchemas20220615) Name() string {
+	return "replace issues.changelog_updated with changelogs.issue_updated"
+}
diff --git a/plugins/jira/tasks/changelog_collector.go b/plugins/jira/tasks/changelog_collector.go
index 22030f7d..ebbb9c44 100644
--- a/plugins/jira/tasks/changelog_collector.go
+++ b/plugins/jira/tasks/changelog_collector.go
@@ -25,7 +25,7 @@ import (
 	"reflect"
 
 	"github.com/apache/incubator-devlake/plugins/core"
-	. "github.com/apache/incubator-devlake/plugins/core/dal"
+	"github.com/apache/incubator-devlake/plugins/core/dal"
 	"github.com/apache/incubator-devlake/plugins/helper"
 	"github.com/apache/incubator-devlake/plugins/jira/models"
 	"github.com/apache/incubator-devlake/plugins/jira/tasks/apiv2models"
@@ -40,26 +40,31 @@ func CollectChangelogs(taskCtx core.SubTaskContext) error {
 	if data.JiraServerInfo.DeploymentType == models.DeploymentServer {
 		return nil
 	}
+	log := taskCtx.GetLogger()
 	db := taskCtx.GetDal()
-	// figure out the time range
-	since := data.Since
 
-	// filter out issue_ids that needed collection
-	clauses := []Clause{
-		Select("bi.issue_id, NOW() AS update_time"),
-		From("_tool_jira_board_issues bi"),
-		Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)"),
-		Where(
-			`bi.connection_id = ?
-			   AND bi.board_id = ?
-			   AND (i.changelog_updated IS NULL OR i.changelog_updated < i.updated)`,
-			data.Options.ConnectionId,
-			data.Options.BoardId,
-		),
+	// query for issue_ids that needed changelog collection
+	clauses := []dal.Clause{
+		dal.Select("i.issue_id, i.updated AS update_time"),
+		dal.From("_tool_jira_board_issues bi"),
+		dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)"),
+		dal.Join("LEFT JOIN _tool_jira_changelogs c ON (c.connection_id = i.connection_id AND c.issue_id = i.issue_id)"),
+		dal.Where(`i.updated > i.created AND bi.connection_id = ?  AND bi.board_id = ?  `, data.Options.ConnectionId, data.Options.BoardId),
+		dal.Groupby("i.issue_id, i.updated"),
+		dal.Having("i.updated > max(c.issue_updated) OR  max(c.issue_updated) IS NULL"),
 	}
 	// apply time range if any
+	since := data.Since
 	if since != nil {
-		clauses = append(clauses, Where("i.updated > ?", *since))
+		clauses = append(clauses, dal.Where("i.updated > ?", *since))
+	}
+
+	if log.IsLevelEnabled(core.LOG_DEBUG) {
+		count, err := db.Count(clauses...)
+		if err != nil {
+			return err
+		}
+		log.Debug("total number of issues to collect for: %d", count)
 	}
 
 	// construct the input iterator
@@ -85,7 +90,7 @@ func CollectChangelogs(taskCtx core.SubTaskContext) error {
 		},
 		ApiClient:     data.ApiClient,
 		PageSize:      100,
-		Incremental:   true,
+		Incremental:   since == nil,
 		GetTotalPages: GetTotalPagesFromResponse,
 		Input:         iterator,
 		UrlTemplate:   "api/3/issue/{{ .Input.IssueId }}/changelog",
diff --git a/plugins/jira/tasks/changelog_extractor.go b/plugins/jira/tasks/changelog_extractor.go
index d46097f2..435cf501 100644
--- a/plugins/jira/tasks/changelog_extractor.go
+++ b/plugins/jira/tasks/changelog_extractor.go
@@ -33,7 +33,6 @@ func ExtractChangelogs(taskCtx core.SubTaskContext) error {
 	if data.JiraServerInfo.DeploymentType == models.DeploymentServer {
 		return nil
 	}
-	db := taskCtx.GetDb()
 	connectionId := data.Connection.ID
 	extractor, err := helper.NewApiExtractor(helper.ApiExtractorArgs{
 		RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
@@ -45,24 +44,25 @@ func ExtractChangelogs(taskCtx core.SubTaskContext) error {
 			Table: RAW_CHANGELOG_TABLE,
 		},
 		Extract: func(row *helper.RawData) ([]interface{}, error) {
+			// process input
 			var input apiv2models.Input
 			err := json.Unmarshal(row.Input, &input)
 			if err != nil {
 				return nil, err
 			}
-			var result []interface{}
 			var changelog apiv2models.Changelog
 			err = json.Unmarshal(row.Data, &changelog)
 			if err != nil {
 				return nil, err
 			}
-			issue := &models.JiraIssue{ConnectionId: connectionId, IssueId: input.IssueId}
-			err = db.Model(issue).Update("changelog_updated", input.UpdateTime).Error
-			if err != nil {
-				return nil, err
-			}
+			// prepare output
+			var result []interface{}
 			cl, user := changelog.ToToolLayer(connectionId, input.IssueId)
+			// this is crucial for incremental update
+			cl.IssueUpdated = &input.UpdateTime
+			// collect changelog / user inforation
 			result = append(result, cl, user)
+			// collect changelog_items
 			for _, item := range changelog.Items {
 				result = append(result, item.ToToolLayer(connectionId, changelog.ID))
 				for _, u := range item.ExtractUser(connectionId) {
diff --git a/plugins/jira/tasks/issue_extractor.go b/plugins/jira/tasks/issue_extractor.go
index f9b2a15d..3c2cee56 100644
--- a/plugins/jira/tasks/issue_extractor.go
+++ b/plugins/jira/tasks/issue_extractor.go
@@ -19,9 +19,11 @@ package tasks
 
 import (
 	"encoding/json"
-	"fmt"
+	"strconv"
 	"strings"
+	"time"
 
+	"github.com/apache/incubator-devlake/models/domainlayer/ticket"
 	"github.com/apache/incubator-devlake/plugins/core"
 	"github.com/apache/incubator-devlake/plugins/helper"
 	"github.com/apache/incubator-devlake/plugins/jira/models"
@@ -34,18 +36,19 @@ func ExtractIssues(taskCtx core.SubTaskContext) error {
 	data := taskCtx.GetData().(*JiraTaskData)
 	connectionId := data.Connection.ID
 	boardId := data.Options.BoardId
-	db := taskCtx.GetDb()
 	logger := taskCtx.GetLogger()
 	logger.Info("extract Issues, connection_id=%d, board_id=%d", connectionId, boardId)
 	// prepare getStdType function
-	var typeMappingRows []*models.JiraIssueTypeMapping
-	err := db.Find(&typeMappingRows, "connection_id = ?", connectionId).Error
-	if err != nil {
-		return err
-	}
+	// TODO: implement type mapping
 	typeMappings := make(map[string]string)
-	for _, typeMappingRow := range typeMappingRows {
-		typeMappings[typeMappingRow.UserType] = typeMappingRow.StandardType
+	for _, userType := range data.Options.IssueExtraction.RequirementTypeMapping {
+		typeMappings[userType] = "REQUIREMENT"
+	}
+	for _, userType := range data.Options.IssueExtraction.BugTypeMapping {
+		typeMappings[userType] = "BUG"
+	}
+	for _, userType := range data.Options.IssueExtraction.IncidentTypeMapping {
+		typeMappings[userType] = "INCIDENT"
 	}
 	getStdType := func(userType string) string {
 		stdType := typeMappings[userType]
@@ -54,20 +57,14 @@ func ExtractIssues(taskCtx core.SubTaskContext) error {
 		}
 		return strings.ToUpper(stdType)
 	}
-	// prepare getStdStatus function
-	// TODO: status mapping is now not used
-	var statusMappingRows []*models.JiraIssueStatusMapping
-	err = db.Find(&statusMappingRows, "connection_id = ?", connectionId).Error
-	if err != nil {
-		return err
-	}
-	statusMappings := make(map[string]string)
-	makeStatusMappingKey := func(userType string, userStatus string) string {
-		return fmt.Sprintf("%v:%v", userType, userStatus)
-	}
-	for _, statusMappingRow := range statusMappingRows {
-		k := makeStatusMappingKey(statusMappingRow.UserType, statusMappingRow.UserStatus)
-		statusMappings[k] = statusMappingRow.StandardStatus
+	getStdStatus := func(statusKey string) string {
+		if statusKey == "done" {
+			return ticket.DONE
+		} else if statusKey == "new" {
+			return ticket.TODO
+		} else {
+			return ticket.IN_PROGRESS
+		}
 	}
 
 	extractor, err := helper.NewApiExtractor(helper.ApiExtractorArgs{
@@ -111,17 +108,26 @@ func ExtractIssues(taskCtx core.SubTaskContext) error {
 			if issue.ResolutionDate != nil {
 				issue.LeadTimeMinutes = uint(issue.ResolutionDate.Unix()-issue.Created.Unix()) / 60
 			}
+			if data.Options.IssueExtraction.StoryPointField != "" {
+				strStoryPoint := apiIssue.Fields.AllFields[data.Options.IssueExtraction.StoryPointField].(string)
+				issue.StoryPoint, _ = strconv.ParseFloat(strStoryPoint, 32)
+			}
 			issue.StdStoryPoint = uint(issue.StoryPoint)
 			issue.StdType = getStdType(issue.Type)
-			issue.StdStatus = GetStdStatus(issue.StatusKey)
-			if len(changelogs) < 100 {
-				issue.ChangelogUpdated = &row.CreatedAt
-			}
+			issue.StdStatus = getStdStatus(issue.StatusKey)
 			results = append(results, issue)
 			for _, worklog := range worklogs {
 				results = append(results, worklog)
 			}
+			var issueUpdated *time.Time
+			// likely this issue has more changelogs to be collected
+			if len(changelogs) == 100 {
+				issueUpdated = nil
+			} else {
+				issueUpdated = &issue.Updated
+			}
 			for _, changelog := range changelogs {
+				changelog.IssueUpdated = issueUpdated
 				results = append(results, changelog)
 			}
 			for _, changelogItem := range changelogItems {
diff --git a/plugins/jira/tasks/task_data.go b/plugins/jira/tasks/task_data.go
index 7b3d6b8c..23cc2957 100644
--- a/plugins/jira/tasks/task_data.go
+++ b/plugins/jira/tasks/task_data.go
@@ -25,10 +25,17 @@ import (
 )
 
 type JiraOptions struct {
-	ConnectionId uint64   `json:"connectionId"`
-	BoardId      uint64   `json:"boardId"`
-	Tasks        []string `json:"tasks,omitempty"`
-	Since        string
+	ConnectionId    uint64   `json:"connectionId"`
+	BoardId         uint64   `json:"boardId"`
+	Tasks           []string `json:"tasks,omitempty"`
+	Since           string
+	IssueExtraction struct {
+		RequirementTypeMapping []string
+		BugTypeMapping         []string
+		IncidentTypeMapping    []string
+		//EpicKeyField           string
+		StoryPointField string
+	}
 }
 
 type JiraTaskData struct {