You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by li...@apache.org on 2023/02/15 09:26:25 UTC

[incubator-devlake] branch release-v0.15 updated: feat: jira supports timefilter by updated_at (#4408)

This is an automated email from the ASF dual-hosted git repository.

likyh pushed a commit to branch release-v0.15
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/release-v0.15 by this push:
     new 1e4f03d0d feat: jira supports timefilter by updated_at (#4408)
1e4f03d0d is described below

commit 1e4f03d0dee2f8aa971261d37257f67fff348cdc
Author: Klesh Wong <zh...@merico.dev>
AuthorDate: Wed Feb 15 17:26:19 2023 +0800

    feat: jira supports timefilter by updated_at (#4408)
    
    * feat: jira supports timefilter by updated_date
    
    * fix: createdDateAfter follow previous logic
    
    * fix: remove redundant condition
    
    * refactor: rename UpdatedDateAfter to TimeAfter
---
 models/collector_state.go                          |  1 +
 ...3_add_updated_date_after_to_collector_state.go} | 31 ++++++++++----
 models/migrationscripts/register.go                |  1 +
 plugins/core/plugin_blueprint.go                   |  9 ++--
 plugins/helper/api_collector_with_state.go         | 48 ++++++++++++++--------
 plugins/jira/impl/impl.go                          | 25 ++++++-----
 plugins/jira/tasks/issue_collector.go              | 15 ++++---
 plugins/jira/tasks/task_data.go                    |  5 ++-
 8 files changed, 88 insertions(+), 47 deletions(-)

diff --git a/models/collector_state.go b/models/collector_state.go
index e0230a408..ee693e1a3 100644
--- a/models/collector_state.go
+++ b/models/collector_state.go
@@ -27,6 +27,7 @@ type CollectorLatestState struct {
 	RawDataParams      string    `gorm:"primaryKey;column:raw_data_params;type:varchar(255);index" json:"raw_data_params"`
 	RawDataTable       string    `gorm:"primaryKey;column:raw_data_table;type:varchar(255)" json:"raw_data_table"`
 	CreatedDateAfter   *time.Time
+	TimeAfter          *time.Time
 	LatestSuccessStart *time.Time
 }
 
diff --git a/models/collector_state.go b/models/migrationscripts/20230213_add_updated_date_after_to_collector_state.go
similarity index 52%
copy from models/collector_state.go
copy to models/migrationscripts/20230213_add_updated_date_after_to_collector_state.go
index e0230a408..d487334fd 100644
--- a/models/collector_state.go
+++ b/models/migrationscripts/20230213_add_updated_date_after_to_collector_state.go
@@ -15,21 +15,34 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package models
+package migrationscripts
 
 import (
 	"time"
+
+	"github.com/apache/incubator-devlake/errors"
+	"github.com/apache/incubator-devlake/helpers/migrationhelper"
+	"github.com/apache/incubator-devlake/plugins/core"
 )
 
-type CollectorLatestState struct {
-	CreatedAt          time.Time `json:"createdAt"`
-	UpdatedAt          time.Time `json:"updatedAt"`
-	RawDataParams      string    `gorm:"primaryKey;column:raw_data_params;type:varchar(255);index" json:"raw_data_params"`
-	RawDataTable       string    `gorm:"primaryKey;column:raw_data_table;type:varchar(255)" json:"raw_data_table"`
-	CreatedDateAfter   *time.Time
-	LatestSuccessStart *time.Time
+type collectorLatestState20230213 struct {
+	TimeAfter *time.Time
 }
 
-func (CollectorLatestState) TableName() string {
+func (collectorLatestState20230213) TableName() string {
 	return "_devlake_collector_latest_state"
 }
+
+type addTimeAfterToCollectorMeta20230213 struct{}
+
+func (script *addTimeAfterToCollectorMeta20230213) Up(basicRes core.BasicRes) errors.Error {
+	return migrationhelper.AutoMigrateTables(basicRes, &collectorLatestState20230213{})
+}
+
+func (*addTimeAfterToCollectorMeta20230213) Version() uint64 {
+	return 20230213200039
+}
+
+func (*addTimeAfterToCollectorMeta20230213) Name() string {
+	return "add time_after to _devlake_collector_latest_state"
+}
diff --git a/models/migrationscripts/register.go b/models/migrationscripts/register.go
index e8a1b132c..ec982a8e4 100644
--- a/models/migrationscripts/register.go
+++ b/models/migrationscripts/register.go
@@ -69,5 +69,6 @@ func All() []core.MigrationScript {
 		new(encryptTask221221),
 		new(renameProjectMetrics),
 		new(addOriginalTypeToIssue221230),
+		new(addTimeAfterToCollectorMeta20230213),
 	}
 }
diff --git a/plugins/core/plugin_blueprint.go b/plugins/core/plugin_blueprint.go
index 11b2e6962..d05d0929f 100644
--- a/plugins/core/plugin_blueprint.go
+++ b/plugins/core/plugin_blueprint.go
@@ -19,8 +19,9 @@ package core
 
 import (
 	"encoding/json"
-	"github.com/apache/incubator-devlake/errors"
 	"time"
+
+	"github.com/apache/incubator-devlake/errors"
 )
 
 // PipelineTask represents a smallest unit of execution inside a PipelinePlan
@@ -172,7 +173,9 @@ type CompositePluginBlueprintV200 interface {
 }
 
 type BlueprintSyncPolicy struct {
-	Version          string     `json:"version" validate:"required,semver,oneof=1.0.0"`
-	SkipOnFail       bool       `json:"skipOnFail"`
+	Version    string `json:"version" validate:"required,semver,oneof=1.0.0"`
+	SkipOnFail bool   `json:"skipOnFail"`
+	// Deprecating
 	CreatedDateAfter *time.Time `json:"createdDateAfter"`
+	TimeAfter        *time.Time `json:"timeAfter"`
 }
diff --git a/plugins/helper/api_collector_with_state.go b/plugins/helper/api_collector_with_state.go
index c8a7a648a..5c25bbd87 100644
--- a/plugins/helper/api_collector_with_state.go
+++ b/plugins/helper/api_collector_with_state.go
@@ -30,13 +30,15 @@ type ApiCollectorStateManager struct {
 	RawDataSubTaskArgs
 	*ApiCollector
 	*GraphqlCollector
-	LatestState      models.CollectorLatestState
+	LatestState models.CollectorLatestState
+	// Deprecating
 	CreatedDateAfter *time.Time
+	TimeAfter        *time.Time
 	ExecuteStart     time.Time
 }
 
-// NewApiCollectorWithState create a new ApiCollectorStateManager
-func NewApiCollectorWithState(args RawDataSubTaskArgs, createdDateAfter *time.Time) (*ApiCollectorStateManager, errors.Error) {
+// NewApiCollectorWithStateEx create a new ApiCollectorStateManager
+func NewApiCollectorWithStateEx(args RawDataSubTaskArgs, createdDateAfter *time.Time, timeAfter *time.Time) (*ApiCollectorStateManager, errors.Error) {
 	db := args.Ctx.GetDal()
 
 	rawDataSubTask, err := NewRawDataSubTask(args)
@@ -59,18 +61,28 @@ func NewApiCollectorWithState(args RawDataSubTaskArgs, createdDateAfter *time.Ti
 		RawDataSubTaskArgs: args,
 		LatestState:        latestState,
 		CreatedDateAfter:   createdDateAfter,
+		TimeAfter:          timeAfter,
 		ExecuteStart:       time.Now(),
 	}, nil
 }
 
-// IsIncremental return if the old data can support collect incrementally.
-// only when latest collection is success &&
-// (m.LatestState.CreatedDateAfter == nil means all data have been collected ||
-// CreatedDateAfter at this time exists and no before than in the LatestState)
-// if CreatedDateAfter at this time not exists, collect incrementally only when "m.LatestState.CreatedDateAfter == nil"
-func (m ApiCollectorStateManager) IsIncremental() bool {
-	return m.LatestState.LatestSuccessStart != nil &&
-		(m.LatestState.CreatedDateAfter == nil || m.CreatedDateAfter != nil && !m.CreatedDateAfter.Before(*m.LatestState.CreatedDateAfter))
+// NewApiCollectorWithState create a new ApiCollectorStateManager
+func NewApiCollectorWithState(args RawDataSubTaskArgs, createdDateAfter *time.Time) (*ApiCollectorStateManager, errors.Error) {
+	return NewApiCollectorWithStateEx(args, createdDateAfter, nil)
+}
+
+// IsIncremental indicates if the collector should operate in incremental mode
+func (m *ApiCollectorStateManager) IsIncremental() bool {
+	// the initial collection
+	if m.LatestState.LatestSuccessStart == nil {
+		return false
+	}
+	// prioritize TimeAfter parameter: collector should filter data by `updated_date`
+	if m.TimeAfter != nil {
+		return m.LatestState.TimeAfter == nil || !m.TimeAfter.Before(*m.LatestState.TimeAfter)
+	}
+	// fallback to CreatedDateAfter: collector should filter data by `created_date`
+	return m.LatestState.CreatedDateAfter == nil || m.CreatedDateAfter != nil && !m.CreatedDateAfter.Before(*m.LatestState.CreatedDateAfter)
 }
 
 // InitCollector init the embedded collector
@@ -94,11 +106,7 @@ func (m ApiCollectorStateManager) Execute() errors.Error {
 		return err
 	}
 
-	db := m.Ctx.GetDal()
-	m.LatestState.LatestSuccessStart = &m.ExecuteStart
-	m.LatestState.CreatedDateAfter = m.CreatedDateAfter
-	err = db.CreateOrUpdate(&m.LatestState)
-	return err
+	return m.updateState()
 }
 
 // ExecuteGraphQL the embedded collector and record execute state
@@ -108,9 +116,13 @@ func (m ApiCollectorStateManager) ExecuteGraphQL() errors.Error {
 		return err
 	}
 
+	return m.updateState()
+}
+
+func (m ApiCollectorStateManager) updateState() errors.Error {
 	db := m.Ctx.GetDal()
 	m.LatestState.LatestSuccessStart = &m.ExecuteStart
 	m.LatestState.CreatedDateAfter = m.CreatedDateAfter
-	err = db.CreateOrUpdate(&m.LatestState)
-	return err
+	m.LatestState.TimeAfter = m.TimeAfter
+	return db.CreateOrUpdate(&m.LatestState)
 }
diff --git a/plugins/jira/impl/impl.go b/plugins/jira/impl/impl.go
index 4ad750086..6b56ee326 100644
--- a/plugins/jira/impl/impl.go
+++ b/plugins/jira/impl/impl.go
@@ -219,14 +219,6 @@ func (plugin Jira) PrepareTaskData(taskCtx core.TaskContext, options map[string]
 		}
 	}
 
-	var createdDateAfter time.Time
-	if op.CreatedDateAfter != "" {
-		createdDateAfter, err = errors.Convert01(time.Parse(time.RFC3339, op.CreatedDateAfter))
-		if err != nil {
-			return nil, errors.BadInput.Wrap(err, "invalid value for `createdDateAfter`")
-		}
-	}
-
 	info, code, err := tasks.GetJiraServerInfo(jiraApiClient)
 	if err != nil || code != http.StatusOK || info == nil {
 		return nil, errors.HttpStatus(code).Wrap(err, "fail to get Jira server info")
@@ -236,11 +228,24 @@ func (plugin Jira) PrepareTaskData(taskCtx core.TaskContext, options map[string]
 		ApiClient:      jiraApiClient,
 		JiraServerInfo: *info,
 	}
-	if !createdDateAfter.IsZero() {
+	if op.CreatedDateAfter != "" {
+		var createdDateAfter time.Time
+		createdDateAfter, err = errors.Convert01(time.Parse(time.RFC3339, op.CreatedDateAfter))
+		if err != nil {
+			return nil, errors.BadInput.Wrap(err, "invalid value for `createdDateAfter`")
+		}
 		taskData.CreatedDateAfter = &createdDateAfter
 		logger.Debug("collect data created from %s", createdDateAfter)
 	}
-
+	if op.TimeAfter != "" {
+		var timeAfter time.Time
+		timeAfter, err = errors.Convert01(time.Parse(time.RFC3339, op.TimeAfter))
+		if err != nil {
+			return nil, errors.BadInput.Wrap(err, "invalid value for `timeAfter`")
+		}
+		taskData.TimeAfter = &timeAfter
+		logger.Debug("collect data created from %s", timeAfter)
+	}
 	return taskData, nil
 }
 
diff --git a/plugins/jira/tasks/issue_collector.go b/plugins/jira/tasks/issue_collector.go
index 332ffa12b..edec1b8e7 100644
--- a/plugins/jira/tasks/issue_collector.go
+++ b/plugins/jira/tasks/issue_collector.go
@@ -44,7 +44,7 @@ var CollectIssuesMeta = core.SubTaskMeta{
 func CollectIssues(taskCtx core.SubTaskContext) errors.Error {
 	data := taskCtx.GetData().(*JiraTaskData)
 
-	collectorWithState, err := helper.NewApiCollectorWithState(helper.RawDataSubTaskArgs{
+	collectorWithState, err := helper.NewApiCollectorWithStateEx(helper.RawDataSubTaskArgs{
 		Ctx: taskCtx,
 		/*
 			This struct will be JSONEncoded and stored into database along with raw data itself, to identity minimal
@@ -58,7 +58,7 @@ func CollectIssues(taskCtx core.SubTaskContext) errors.Error {
 			Table store raw data
 		*/
 		Table: RAW_ISSUE_TABLE,
-	}, data.CreatedDateAfter)
+	}, data.CreatedDateAfter, data.TimeAfter)
 	if err != nil {
 		return err
 	}
@@ -66,13 +66,16 @@ func CollectIssues(taskCtx core.SubTaskContext) errors.Error {
 	// build jql
 	// IMPORTANT: we have to keep paginated data in a consistence order to avoid data-missing, if we sort issues by
 	//  `updated`, issue will be jumping between pages if it got updated during the collection process
-	createdDateAfter := data.CreatedDateAfter
 	jql := "created is not null ORDER BY created ASC"
-	if createdDateAfter != nil {
-		// prepend a time range criteria if `since` was specified, either by user or from database
-		jql = fmt.Sprintf("created >= '%v' AND %v", createdDateAfter.Format("2006/01/02 15:04"), jql)
+
+	// timer filter
+	if data.TimeAfter != nil {
+		jql = fmt.Sprintf("updated >= '%v' AND %v", data.TimeAfter.Format("2006/01/02 15:04"), jql)
+	} else if data.CreatedDateAfter != nil {
+		jql = fmt.Sprintf("created >= '%v' AND %v", data.CreatedDateAfter.Format("2006/01/02 15:04"), jql)
 	}
 
+	// diff sync
 	incremental := collectorWithState.IsIncremental()
 	if incremental {
 		jql = fmt.Sprintf("updated >= '%v' AND %v", collectorWithState.LatestState.LatestSuccessStart.Format("2006/01/02 15:04"), jql)
diff --git a/plugins/jira/tasks/task_data.go b/plugins/jira/tasks/task_data.go
index 866accb65..5d9ec4ca9 100644
--- a/plugins/jira/tasks/task_data.go
+++ b/plugins/jira/tasks/task_data.go
@@ -20,9 +20,10 @@ package tasks
 import (
 	"encoding/json"
 	"fmt"
-	"github.com/apache/incubator-devlake/errors"
 	"time"
 
+	"github.com/apache/incubator-devlake/errors"
+
 	"github.com/apache/incubator-devlake/plugins/helper"
 	"github.com/apache/incubator-devlake/plugins/jira/models"
 )
@@ -95,6 +96,7 @@ type JiraOptions struct {
 	ConnectionId         uint64 `json:"connectionId"`
 	BoardId              uint64 `json:"boardId"`
 	CreatedDateAfter     string
+	TimeAfter            string
 	TransformationRules  *JiraTransformationRule `json:"transformationRules"`
 	ScopeId              string
 	TransformationRuleId uint64
@@ -104,6 +106,7 @@ type JiraTaskData struct {
 	Options          *JiraOptions
 	ApiClient        *helper.ApiAsyncClient
 	CreatedDateAfter *time.Time
+	TimeAfter        *time.Time
 	JiraServerInfo   models.JiraServerInfo
 }