You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ma...@apache.org on 2023/02/23 03:13:41 UTC

[incubator-devlake] branch main updated: refactor: gitlab adopt timeAfter option (#4487)

This is an automated email from the ASF dual-hosted git repository.

mappjzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 4fb0487f4 refactor: gitlab adopt timeAfter option (#4487)
4fb0487f4 is described below

commit 4fb0487f4184b5b34e06380ac55fc89207c36890
Author: Klesh Wong <zh...@merico.dev>
AuthorDate: Thu Feb 23 11:13:37 2023 +0800

    refactor: gitlab adopt timeAfter option (#4487)
---
 backend/plugins/gitlab/api/blueprint_v200.go              |  4 ++--
 backend/plugins/gitlab/gitlab.go                          |  4 ++--
 backend/plugins/gitlab/impl/impl.go                       | 14 +++++++-------
 backend/plugins/gitlab/tasks/issue_collector.go           | 15 ++++++++-------
 backend/plugins/gitlab/tasks/job_collector.go             |  2 +-
 backend/plugins/gitlab/tasks/mr_collector.go              | 13 +++++++------
 backend/plugins/gitlab/tasks/mr_commit_collector.go       |  2 +-
 backend/plugins/gitlab/tasks/mr_detail_collector.go       |  8 +++++---
 backend/plugins/gitlab/tasks/mr_note_collector.go         |  2 +-
 backend/plugins/gitlab/tasks/pipeline_collector.go        |  7 +++++--
 backend/plugins/gitlab/tasks/pipeline_detail_collector.go |  6 +++---
 backend/plugins/gitlab/tasks/shared.go                    | 10 +++++-----
 backend/plugins/gitlab/tasks/task_data.go                 | 10 +++++-----
 13 files changed, 52 insertions(+), 45 deletions(-)

diff --git a/backend/plugins/gitlab/api/blueprint_v200.go b/backend/plugins/gitlab/api/blueprint_v200.go
index a19ee9c50..477cce1ad 100644
--- a/backend/plugins/gitlab/api/blueprint_v200.go
+++ b/backend/plugins/gitlab/api/blueprint_v200.go
@@ -145,8 +145,8 @@ func makePipelinePlanV200(
 		options["connectionId"] = connection.ID
 		options["projectId"] = intScopeId
 		options["transformationRuleId"] = transformationRules.ID
-		if syncPolicy.CreatedDateAfter != nil {
-			options["createdDateAfter"] = syncPolicy.CreatedDateAfter.Format(time.RFC3339)
+		if syncPolicy.TimeAfter != nil {
+			options["timeAfter"] = syncPolicy.TimeAfter.Format(time.RFC3339)
 		}
 
 		// construct subtasks
diff --git a/backend/plugins/gitlab/gitlab.go b/backend/plugins/gitlab/gitlab.go
index 4641109e8..a45b0aa27 100644
--- a/backend/plugins/gitlab/gitlab.go
+++ b/backend/plugins/gitlab/gitlab.go
@@ -31,7 +31,7 @@ func main() {
 	cmd := &cobra.Command{Use: "gitlab"}
 	projectId := cmd.Flags().IntP("project-id", "p", 0, "gitlab project id")
 	connectionId := cmd.Flags().Uint64P("connection-id", "c", 0, "gitlab connection id")
-	CreatedDateAfter := cmd.Flags().StringP("createdDateAfter", "a", "", "collect data that are created after specified time, ie 2006-05-06T07:08:09Z")
+	timeAfter := cmd.Flags().StringP("timeAfter", "a", "", "collect data that are created after specified time, ie 2006-05-06T07:08:09Z")
 	_ = cmd.MarkFlagRequired("project-id")
 	_ = cmd.MarkFlagRequired("connection-id")
 
@@ -50,7 +50,7 @@ func main() {
 		runner.DirectRun(cmd, args, PluginEntry, map[string]interface{}{
 			"projectId":            *projectId,
 			"connectionId":         *connectionId,
-			"createdDateAfter":     *CreatedDateAfter,
+			"timeAfter":            *timeAfter,
 			"prType":               *prType,
 			"prComponent":          *prComponent,
 			"prBodyClosePattern":   *prBodyClosePattern,
diff --git a/backend/plugins/gitlab/impl/impl.go b/backend/plugins/gitlab/impl/impl.go
index 7847a2965..9bcd8abf2 100644
--- a/backend/plugins/gitlab/impl/impl.go
+++ b/backend/plugins/gitlab/impl/impl.go
@@ -164,11 +164,11 @@ func (p Gitlab) PrepareTaskData(taskCtx plugin.TaskContext, options map[string]i
 		return nil, err
 	}
 
-	var createdDateAfter time.Time
-	if op.CreatedDateAfter != "" {
-		createdDateAfter, err = errors.Convert01(time.Parse(time.RFC3339, op.CreatedDateAfter))
+	var timeAfter time.Time
+	if op.TimeAfter != "" {
+		timeAfter, err = errors.Convert01(time.Parse(time.RFC3339, op.TimeAfter))
 		if err != nil {
-			return nil, errors.BadInput.Wrap(err, "invalid value for `createdDateAfter`")
+			return nil, errors.BadInput.Wrap(err, "invalid value for `timeAfter`")
 		}
 	}
 
@@ -215,9 +215,9 @@ func (p Gitlab) PrepareTaskData(taskCtx plugin.TaskContext, options map[string]i
 		ApiClient: apiClient,
 	}
 
-	if !createdDateAfter.IsZero() {
-		taskData.CreatedDateAfter = &createdDateAfter
-		logger.Debug("collect data updated createdDateAfter %s", createdDateAfter)
+	if !timeAfter.IsZero() {
+		taskData.TimeAfter = &timeAfter
+		logger.Debug("collect data updated timeAfter %s", timeAfter)
 	}
 	return &taskData, nil
 }
diff --git a/backend/plugins/gitlab/tasks/issue_collector.go b/backend/plugins/gitlab/tasks/issue_collector.go
index e3d757712..bb7f017d2 100644
--- a/backend/plugins/gitlab/tasks/issue_collector.go
+++ b/backend/plugins/gitlab/tasks/issue_collector.go
@@ -20,12 +20,13 @@ package tasks
 import (
 	"encoding/json"
 	"fmt"
-	"github.com/apache/incubator-devlake/core/errors"
-	"github.com/apache/incubator-devlake/core/plugin"
-	helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
 	"net/http"
 	"net/url"
 	"time"
+
+	"github.com/apache/incubator-devlake/core/errors"
+	"github.com/apache/incubator-devlake/core/plugin"
+	helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
 )
 
 const RAW_ISSUE_TABLE = "gitlab_api_issues"
@@ -40,7 +41,7 @@ var CollectApiIssuesMeta = plugin.SubTaskMeta{
 
 func CollectApiIssues(taskCtx plugin.SubTaskContext) errors.Error {
 	rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_ISSUE_TABLE)
-	collectorWithState, err := helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+	collectorWithState, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
 	if err != nil {
 		return err
 	}
@@ -57,12 +58,12 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext) errors.Error {
 		*/
 		Query: func(reqData *helper.RequestData) (url.Values, errors.Error) {
 			query := url.Values{}
+			if collectorWithState.TimeAfter != nil {
+				query.Set("updated_after", collectorWithState.TimeAfter.Format(time.RFC3339))
+			}
 			if incremental {
 				query.Set("updated_after", collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339))
 			}
-			if collectorWithState.CreatedDateAfter != nil {
-				query.Set("created_after", collectorWithState.CreatedDateAfter.Format(time.RFC3339))
-			}
 			query.Set("sort", "asc")
 			query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
 			query.Set("per_page", fmt.Sprintf("%v", reqData.Pager.Size))
diff --git a/backend/plugins/gitlab/tasks/job_collector.go b/backend/plugins/gitlab/tasks/job_collector.go
index 142f3c2fe..5700b173e 100644
--- a/backend/plugins/gitlab/tasks/job_collector.go
+++ b/backend/plugins/gitlab/tasks/job_collector.go
@@ -43,7 +43,7 @@ func CollectApiJobs(taskCtx plugin.SubTaskContext) errors.Error {
 		Incremental:        false,
 		UrlTemplate:        "projects/{{ .Params.ProjectId }}/jobs",
 		Query:              GetQuery,
-		ResponseParser:     GetRawMessageCreatedAtAfter(data.CreatedDateAfter),
+		ResponseParser:     GetRawMessageUpdatedAtAfter(data.TimeAfter),
 		AfterResponse:      ignoreHTTPStatus403, // ignore 403 for CI/CD disable
 	})
 
diff --git a/backend/plugins/gitlab/tasks/mr_collector.go b/backend/plugins/gitlab/tasks/mr_collector.go
index 2471d7e6e..69dbc6c63 100644
--- a/backend/plugins/gitlab/tasks/mr_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_collector.go
@@ -18,11 +18,12 @@ limitations under the License.
 package tasks
 
 import (
+	"net/url"
+	"time"
+
 	"github.com/apache/incubator-devlake/core/errors"
 	"github.com/apache/incubator-devlake/core/plugin"
 	helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
-	"net/url"
-	"time"
 )
 
 const RAW_MERGE_REQUEST_TABLE = "gitlab_api_merge_requests"
@@ -37,7 +38,7 @@ var CollectApiMergeRequestsMeta = plugin.SubTaskMeta{
 
 func CollectApiMergeRequests(taskCtx plugin.SubTaskContext) errors.Error {
 	rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_MERGE_REQUEST_TABLE)
-	collectorWithState, err := helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+	collectorWithState, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
 	if err != nil {
 		return err
 	}
@@ -55,12 +56,12 @@ func CollectApiMergeRequests(taskCtx plugin.SubTaskContext) errors.Error {
 			if err != nil {
 				return nil, err
 			}
+			if collectorWithState.TimeAfter != nil {
+				query.Set("updated_after", collectorWithState.TimeAfter.Format(time.RFC3339))
+			}
 			if incremental {
 				query.Set("updated_after", collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339))
 			}
-			if collectorWithState.CreatedDateAfter != nil {
-				query.Set("created_after", collectorWithState.CreatedDateAfter.Format(time.RFC3339))
-			}
 			return query, nil
 		},
 	})
diff --git a/backend/plugins/gitlab/tasks/mr_commit_collector.go b/backend/plugins/gitlab/tasks/mr_commit_collector.go
index 3d70eef7d..2891f84c0 100644
--- a/backend/plugins/gitlab/tasks/mr_commit_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_commit_collector.go
@@ -35,7 +35,7 @@ var CollectApiMrCommitsMeta = plugin.SubTaskMeta{
 
 func CollectApiMergeRequestsCommits(taskCtx plugin.SubTaskContext) errors.Error {
 	rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_MERGE_REQUEST_COMMITS_TABLE)
-	collectorWithState, err := helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+	collectorWithState, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
 	if err != nil {
 		return err
 	}
diff --git a/backend/plugins/gitlab/tasks/mr_detail_collector.go b/backend/plugins/gitlab/tasks/mr_detail_collector.go
index 64a959bf8..e9cbe02e8 100644
--- a/backend/plugins/gitlab/tasks/mr_detail_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_detail_collector.go
@@ -39,7 +39,7 @@ var CollectApiMergeRequestDetailsMeta = plugin.SubTaskMeta{
 
 func CollectApiMergeRequestDetails(taskCtx plugin.SubTaskContext) errors.Error {
 	rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_MERGE_REQUEST_DETAIL_TABLE)
-	collectorWithState, err := helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+	collectorWithState, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
 	if err != nil {
 		return err
 	}
@@ -79,8 +79,10 @@ func GetMergeRequestDetailsIterator(taskCtx plugin.SubTaskContext, collectorWith
 			data.Options.ProjectId, data.Options.ConnectionId, true,
 		),
 	}
-	if collectorWithState.CreatedDateAfter != nil {
-		clauses = append(clauses, dal.Where("gitlab_created_at > ?", *collectorWithState.CreatedDateAfter))
+	if collectorWithState.LatestState.LatestSuccessStart != nil {
+		clauses = append(clauses, dal.Where("gitlab_updated_at > ?", *collectorWithState.LatestState.LatestSuccessStart))
+	} else if collectorWithState.TimeAfter != nil {
+		clauses = append(clauses, dal.Where("gitlab_updated_at > ?", *collectorWithState.TimeAfter))
 	}
 	// construct the input iterator
 	cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/gitlab/tasks/mr_note_collector.go b/backend/plugins/gitlab/tasks/mr_note_collector.go
index ae6028b3c..2c39ad4a4 100644
--- a/backend/plugins/gitlab/tasks/mr_note_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_note_collector.go
@@ -35,7 +35,7 @@ var CollectApiMrNotesMeta = plugin.SubTaskMeta{
 
 func CollectApiMergeRequestsNotes(taskCtx plugin.SubTaskContext) errors.Error {
 	rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_MERGE_REQUEST_NOTES_TABLE)
-	collectorWithState, err := helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+	collectorWithState, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
 	if err != nil {
 		return err
 	}
diff --git a/backend/plugins/gitlab/tasks/pipeline_collector.go b/backend/plugins/gitlab/tasks/pipeline_collector.go
index d8a487f30..115aaf851 100644
--- a/backend/plugins/gitlab/tasks/pipeline_collector.go
+++ b/backend/plugins/gitlab/tasks/pipeline_collector.go
@@ -39,7 +39,7 @@ var CollectApiPipelinesMeta = plugin.SubTaskMeta{
 
 func CollectApiPipelines(taskCtx plugin.SubTaskContext) errors.Error {
 	rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_PIPELINE_TABLE)
-	collectorWithState, err := helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+	collectorWithState, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
 	if err != nil {
 		return err
 	}
@@ -58,8 +58,11 @@ func CollectApiPipelines(taskCtx plugin.SubTaskContext) errors.Error {
 		UrlTemplate:        "projects/{{ .Params.ProjectId }}/pipelines",
 		Query: func(reqData *helper.RequestData) (url.Values, errors.Error) {
 			query := url.Values{}
+			if collectorWithState.TimeAfter != nil {
+				query.Set("updated_after", collectorWithState.TimeAfter.Format(time.RFC3339))
+			}
 			if incremental {
-				query.Set("updated_after", collectorWithState.LatestState.LatestSuccessStart.String())
+				query.Set("updated_after", collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339))
 			}
 			query.Set("with_stats", "true")
 			query.Set("sort", "asc")
diff --git a/backend/plugins/gitlab/tasks/pipeline_detail_collector.go b/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
index 4465df09e..923457f50 100644
--- a/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
+++ b/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
@@ -40,7 +40,7 @@ var CollectApiPipelineDetailsMeta = plugin.SubTaskMeta{
 
 func CollectApiPipelineDetails(taskCtx plugin.SubTaskContext) errors.Error {
 	rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_PIPELINE_DETAILS_TABLE)
-	collectorWithState, err := helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+	collectorWithState, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
 	if err != nil {
 		return err
 	}
@@ -91,8 +91,8 @@ func GetPipelinesIterator(taskCtx plugin.SubTaskContext, collectorWithState *hel
 			data.Options.ProjectId, data.Options.ConnectionId, true,
 		),
 	}
-	if collectorWithState.CreatedDateAfter != nil {
-		clauses = append(clauses, dal.Where("gitlab_created_at > ?", *collectorWithState.CreatedDateAfter))
+	if collectorWithState.LatestState.LatestSuccessStart != nil {
+		clauses = append(clauses, dal.Where("gitlab_updated_at > ?", *collectorWithState.LatestState.LatestSuccessStart))
 	}
 	// construct the input iterator
 	cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/gitlab/tasks/shared.go b/backend/plugins/gitlab/tasks/shared.go
index 0d94e3e1a..e8468d3a8 100644
--- a/backend/plugins/gitlab/tasks/shared.go
+++ b/backend/plugins/gitlab/tasks/shared.go
@@ -96,9 +96,9 @@ func GetOneRawMessageFromResponse(res *http.Response) ([]json.RawMessage, errors
 	return []json.RawMessage{rawMessage}, nil
 }
 
-func GetRawMessageCreatedAtAfter(createDateAfter *time.Time) func(res *http.Response) ([]json.RawMessage, errors.Error) {
+func GetRawMessageUpdatedAtAfter(timeAfter *time.Time) func(res *http.Response) ([]json.RawMessage, errors.Error) {
 	type ApiModel struct {
-		CreatedAt *helper.Iso8601Time `json:"created_at"`
+		UpdatedAt *helper.Iso8601Time `json:"updated_at"`
 	}
 
 	return func(res *http.Response) ([]json.RawMessage, errors.Error) {
@@ -114,7 +114,7 @@ func GetRawMessageCreatedAtAfter(createDateAfter *time.Time) func(res *http.Resp
 			if err != nil {
 				return nil, err
 			}
-			if createDateAfter == nil || createDateAfter.Before(apiModel.CreatedAt.ToTime()) {
+			if timeAfter == nil || timeAfter.Before(apiModel.UpdatedAt.ToTime()) {
 				// only finish when all items are created before `createDateAfter`
 				// because gitlab's order may not strict enough
 				isFinish = false
@@ -161,8 +161,8 @@ func GetMergeRequestsIterator(taskCtx plugin.SubTaskContext, collectorWithState
 			data.Options.ProjectId, data.Options.ConnectionId,
 		),
 	}
-	if collectorWithState.CreatedDateAfter != nil {
-		clauses = append(clauses, dal.Where("gitlab_created_at > ?", *collectorWithState.CreatedDateAfter))
+	if collectorWithState.LatestState.LatestSuccessStart != nil {
+		clauses = append(clauses, dal.Where("gitlab_updated_at > ?", *collectorWithState.LatestState.LatestSuccessStart))
 	}
 	// construct the input iterator
 	cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/gitlab/tasks/task_data.go b/backend/plugins/gitlab/tasks/task_data.go
index e26e19d4f..9c0a5a67a 100644
--- a/backend/plugins/gitlab/tasks/task_data.go
+++ b/backend/plugins/gitlab/tasks/task_data.go
@@ -30,15 +30,15 @@ type GitlabOptions struct {
 	ProjectId                        int      `mapstructure:"projectId" json:"projectId"`
 	TransformationRuleId             uint64   `mapstructure:"transformationRuleId" json:"transformationRuleId"`
 	Tasks                            []string `mapstructure:"tasks" json:"tasks,omitempty"`
-	CreatedDateAfter                 string
+	TimeAfter                        string
 	*models.GitlabTransformationRule `mapstructure:"transformationRules" json:"transformationRules"`
 }
 
 type GitlabTaskData struct {
-	Options          *GitlabOptions
-	ApiClient        *helper.ApiAsyncClient
-	ProjectCommit    *models.GitlabProjectCommit
-	CreatedDateAfter *time.Time
+	Options       *GitlabOptions
+	ApiClient     *helper.ApiAsyncClient
+	ProjectCommit *models.GitlabProjectCommit
+	TimeAfter     *time.Time
 }
 
 func DecodeAndValidateTaskOptions(options map[string]interface{}) (*GitlabOptions, errors.Error) {