You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by zh...@apache.org on 2022/06/15 06:23:52 UTC

[incubator-devlake] branch main updated: refactor(gitlab): change db to dal (#2165)

This is an automated email from the ASF dual-hosted git repository.

zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 40fd9a0a refactor(gitlab): change db to dal (#2165)
40fd9a0a is described below

commit 40fd9a0a7ceb2cbc116a296d07222bd3b44c4359
Author: Warren Chen <yi...@merico.dev>
AuthorDate: Wed Jun 15 14:23:47 2022 +0800

    refactor(gitlab): change db to dal (#2165)
---
 impl/dalgorm/dalgorm.go                            |   6 +-
 plugins/gitlab/gitlab.go                           |  35 ++-----
 .../gitlab/models/migrationscripts/init_schema.go  |   2 +-
 plugins/gitlab/tasks/commit_convertor.go           |  18 ++--
 plugins/gitlab/tasks/issue_collector.go            |  39 ++------
 plugins/gitlab/tasks/issue_convertor.go            |  12 ++-
 plugins/gitlab/tasks/issue_label_convertor.go      |  18 ++--
 plugins/gitlab/tasks/mr_comment_convertor.go       |  15 ++-
 plugins/gitlab/tasks/mr_commit_convertor.go        |  17 +++-
 plugins/gitlab/tasks/mr_convertor.go               |  15 ++-
 plugins/gitlab/tasks/mr_enricher.go                | 109 +++++++++++++--------
 plugins/gitlab/tasks/note_convertor.go             |  15 ++-
 plugins/gitlab/tasks/project_convertor.go          |  11 ++-
 plugins/gitlab/tasks/shared.go                     |  30 +++---
 14 files changed, 182 insertions(+), 160 deletions(-)

diff --git a/impl/dalgorm/dalgorm.go b/impl/dalgorm/dalgorm.go
index 5fc897d0..b063f863 100644
--- a/impl/dalgorm/dalgorm.go
+++ b/impl/dalgorm/dalgorm.go
@@ -39,9 +39,9 @@ type Dalgorm struct {
 //}
 
 func buildTx(tx *gorm.DB, clauses []dal.Clause) *gorm.DB {
-	for _, clause := range clauses {
-		t := clause.Type
-		d := clause.Data
+	for _, c := range clauses {
+		t := c.Type
+		d := c.Data
 		switch t {
 		case dal.JoinClause:
 			tx = tx.Joins(d.(dal.DalClause).Expr, d.(dal.DalClause).Params...)
diff --git a/plugins/gitlab/gitlab.go b/plugins/gitlab/gitlab.go
index 3f9ecc15..88ce9f10 100644
--- a/plugins/gitlab/gitlab.go
+++ b/plugins/gitlab/gitlab.go
@@ -18,10 +18,7 @@ limitations under the License.
 package main // must be main for plugin entry point
 
 import (
-	"github.com/apache/incubator-devlake/config"
-	"github.com/apache/incubator-devlake/logger"
 	"github.com/apache/incubator-devlake/plugins/gitlab/impl"
-	"github.com/apache/incubator-devlake/plugins/tapd/models"
 	"github.com/apache/incubator-devlake/runner"
 	"github.com/spf13/cobra"
 )
@@ -32,31 +29,13 @@ var PluginEntry impl.Gitlab //nolint
 // standalone mode for debugging
 func main() {
 	gitlabCmd := &cobra.Command{Use: "gitlab"}
-	gitlabCmd.Run = func(c *cobra.Command, args []string) {
-		cfg := config.GetConfig()
-		log := logger.Global.Nested(gitlabCmd.Use)
-		db, err := runner.NewGormDb(cfg, log)
-		if err != nil {
-			panic(err)
-		}
-		wsList := make([]*models.TapdWorkspace, 0)
-		err = db.First(&wsList, "parent_id = ?", 59169984).Error
-		if err != nil {
-			panic(err)
-		}
-		projectList := []uint64{63281714,
-			34276182,
-			46319043,
-			50328292,
-			63984859,
-			55805854,
-			38496185,
-		}
-		for _, v := range projectList {
-			runner.DirectRun(gitlabCmd, args, PluginEntry, map[string]interface{}{
-				"projectId": v,
-			})
-		}
+	projectId := gitlabCmd.Flags().IntP("project-id", "p", 0, "gitlab project id")
+
+	_ = gitlabCmd.MarkFlagRequired("project-id")
+	gitlabCmd.Run = func(cmd *cobra.Command, args []string) {
+		runner.DirectRun(cmd, args, PluginEntry, map[string]interface{}{
+			"projectId": *projectId,
+		})
 	}
 
 	runner.RunCmd(gitlabCmd)
diff --git a/plugins/gitlab/models/migrationscripts/init_schema.go b/plugins/gitlab/models/migrationscripts/init_schema.go
index a826d8aa..1f5cba27 100644
--- a/plugins/gitlab/models/migrationscripts/init_schema.go
+++ b/plugins/gitlab/models/migrationscripts/init_schema.go
@@ -103,7 +103,7 @@ func (*InitSchemas) Up(ctx context.Context, db *gorm.DB) error {
 	}
 	conn.Proxy = v.GetString("GITLAB_PROXY")
 	conn.RateLimit = v.GetInt("GITLAB_API_REQUESTS_PER_HOUR")
-	fmt.Println(conn.Endpoint)
+
 	err = db.Clauses(clause.OnConflict{DoNothing: true}).Create(conn).Error
 
 	if err != nil {
diff --git a/plugins/gitlab/tasks/commit_convertor.go b/plugins/gitlab/tasks/commit_convertor.go
index 34446b9f..8f55da94 100644
--- a/plugins/gitlab/tasks/commit_convertor.go
+++ b/plugins/gitlab/tasks/commit_convertor.go
@@ -18,6 +18,7 @@ limitations under the License.
 package tasks
 
 import (
+	"github.com/apache/incubator-devlake/plugins/core/dal"
 	"reflect"
 
 	"github.com/apache/incubator-devlake/models/domainlayer/code"
@@ -35,18 +36,19 @@ var ConvertApiCommitsMeta = core.SubTaskMeta{
 }
 
 func ConvertApiCommits(taskCtx core.SubTaskContext) error {
-
 	rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_COMMIT_TABLE)
-	db := taskCtx.GetDb()
+	db := taskCtx.GetDal()
 
 	// select all commits belongs to the project
-	cursor, err := db.Table("_tool_gitlab_commits gc").
-		Joins(`left join _tool_gitlab_project_commits gpc on (
+	clauses := []dal.Clause{
+		dal.Select("gc.*"),
+		dal.From("_tool_gitlab_commits gc"),
+		dal.Join(`left join _tool_gitlab_project_commits gpc on (
 			gpc.commit_sha = gc.sha
-		)`).
-		Select("gc.*").
-		Where("gpc.gitlab_project_id = ?", data.Options.ProjectId).
-		Rows()
+		)`),
+		dal.Where("gpc.gitlab_project_id = ?", data.Options.ProjectId),
+	}
+	cursor, err := db.Cursor(clauses...)
 	if err != nil {
 		return err
 	}
diff --git a/plugins/gitlab/tasks/issue_collector.go b/plugins/gitlab/tasks/issue_collector.go
index 15342648..6f231893 100644
--- a/plugins/gitlab/tasks/issue_collector.go
+++ b/plugins/gitlab/tasks/issue_collector.go
@@ -20,6 +20,7 @@ package tasks
 import (
 	"encoding/json"
 	"fmt"
+	"github.com/apache/incubator-devlake/plugins/core/dal"
 	"net/http"
 	"net/url"
 
@@ -44,7 +45,7 @@ var CollectApiIssuesMeta = core.SubTaskMeta{
 }
 
 func CollectApiIssues(taskCtx core.SubTaskContext) error {
-	db := taskCtx.GetDb()
+	db := taskCtx.GetDal()
 	data := taskCtx.GetData().(*GitlabTaskData)
 
 	since := data.Since
@@ -52,10 +53,10 @@ func CollectApiIssues(taskCtx core.SubTaskContext) error {
 	// user didn't specify a time range to sync, try load from database
 	if since == nil {
 		var latestUpdated models.GitlabIssue
-		err := db.Model(&latestUpdated).
-			Where("project_id = ?", data.Options.ProjectId).
-			Order("gitlab_updated_at DESC").Limit(1).Find(&latestUpdated).Error
-
+		clause := []dal.Clause{
+			dal.Orderby("gitlab_updated_at DESC"),
+		}
+		err := db.First(&latestUpdated, clause...)
 		if err != nil {
 			return fmt.Errorf("failed to get latest gitlab issue record: %w", err)
 		}
@@ -83,15 +84,7 @@ func CollectApiIssues(taskCtx core.SubTaskContext) error {
 		ApiClient:   data.ApiClient,
 		PageSize:    100,
 		Incremental: incremental,
-		/*
-			url may use arbitrary variables from different source in any order, we need GoTemplate to allow more
-			flexible for all kinds of possibility.
-			Pager contains information for a particular page, calculated by ApiCollector, and will be passed into
-			GoTemplate to generate a url for that page.
-			We want to do page-fetching in ApiCollector, because the logic are highly similar, by doing so, we can
-			avoid duplicate logic for every tasks, and when we have a better idea like improving performance, we can
-			do it in one place
-		*/
+
 		UrlTemplate: "projects/{{ .Params.ProjectId }}/issues",
 		/*
 			(Optional) Return query string for request, or you can plug them into UrlTemplate directly
@@ -107,23 +100,7 @@ func CollectApiIssues(taskCtx core.SubTaskContext) error {
 
 			return query, nil
 		},
-		/*
-			Some api might do pagination by http headers
-		*/
-		//Header: func(pager *core.Pager) http.Header {
-		//},
-		/*
-			Sometimes, we need to collect data based on previous collected data, like jira changelog, it requires
-			issue_id as part of the url.
-			We can mimic `stdin` design, to accept a `Input` function which produces a `Iterator`, collector
-			should iterate all records, and do data-fetching for each on, either in parallel or sequential order
-			UrlTemplate: "api/3/issue/{{ Input.ID }}/changelog"
-		*/
-		//Input: databaseIssuesIterator,
-		/*
-			For api endpoint that returns number of total pages, ApiCollector can collect pages in parallel with ease,
-			or other techniques are required if this information was missing.
-		*/
+
 		GetTotalPages: GetTotalPagesFromResponse,
 		ResponseParser: func(res *http.Response) ([]json.RawMessage, error) {
 			var items []json.RawMessage
diff --git a/plugins/gitlab/tasks/issue_convertor.go b/plugins/gitlab/tasks/issue_convertor.go
index b8186791..acae625d 100644
--- a/plugins/gitlab/tasks/issue_convertor.go
+++ b/plugins/gitlab/tasks/issue_convertor.go
@@ -18,6 +18,7 @@ limitations under the License.
 package tasks
 
 import (
+	"github.com/apache/incubator-devlake/plugins/core/dal"
 	"reflect"
 	"strconv"
 
@@ -38,13 +39,16 @@ var ConvertIssuesMeta = core.SubTaskMeta{
 }
 
 func ConvertIssues(taskCtx core.SubTaskContext) error {
-	db := taskCtx.GetDb()
+	db := taskCtx.GetDal()
 	data := taskCtx.GetData().(*GitlabTaskData)
 	projectId := data.Options.ProjectId
 
-	issue := &gitlabModels.GitlabIssue{}
-	cursor, err := db.Model(issue).Where("project_id = ?", projectId).Rows()
-
+	clauses := []dal.Clause{
+		dal.Select("issues.*"),
+		dal.From("_tool_gitlab_issues issues"),
+		dal.Where("project_id = ?", projectId),
+	}
+	cursor, err := db.Cursor(clauses...)
 	if err != nil {
 		return err
 	}
diff --git a/plugins/gitlab/tasks/issue_label_convertor.go b/plugins/gitlab/tasks/issue_label_convertor.go
index 4556ac23..3b591c3d 100644
--- a/plugins/gitlab/tasks/issue_label_convertor.go
+++ b/plugins/gitlab/tasks/issue_label_convertor.go
@@ -18,6 +18,7 @@ limitations under the License.
 package tasks
 
 import (
+	"github.com/apache/incubator-devlake/plugins/core/dal"
 	"reflect"
 
 	"github.com/apache/incubator-devlake/models/domainlayer/didgen"
@@ -35,19 +36,24 @@ var ConvertIssueLabelsMeta = core.SubTaskMeta{
 }
 
 func ConvertIssueLabels(taskCtx core.SubTaskContext) error {
-	db := taskCtx.GetDb()
+	db := taskCtx.GetDal()
 	data := taskCtx.GetData().(*GitlabTaskData)
 	projectId := data.Options.ProjectId
+	clauses := []dal.Clause{
+		dal.Select("*"),
+		dal.From(&gitlabModels.GitlabIssueLabel{}),
+		dal.Join(`left join _tool_gitlab_issues on 
+			_tool_gitlab_issues.gitlab_id = _tool_gitlab_issue_labels.issue_id`),
+		dal.Where("_tool_gitlab_issues.project_id = ?", projectId),
+		dal.Orderby("issue_id ASC"),
+	}
 
-	cursor, err := db.Model(&gitlabModels.GitlabIssueLabel{}).
-		Joins(`left join _tool_gitlab_issues on _tool_gitlab_issues.gitlab_id = _tool_gitlab_issue_labels.issue_id`).
-		Where("_tool_gitlab_issues.project_id = ?", projectId).
-		Order("issue_id ASC").
-		Rows()
+	cursor, err := db.Cursor(clauses...)
 	if err != nil {
 		return err
 	}
 	defer cursor.Close()
+
 	issueIdGen := didgen.NewDomainIdGenerator(&gitlabModels.GitlabIssue{})
 
 	converter, err := helper.NewDataConverter(helper.DataConverterArgs{
diff --git a/plugins/gitlab/tasks/mr_comment_convertor.go b/plugins/gitlab/tasks/mr_comment_convertor.go
index 858626bf..33c636f6 100644
--- a/plugins/gitlab/tasks/mr_comment_convertor.go
+++ b/plugins/gitlab/tasks/mr_comment_convertor.go
@@ -18,6 +18,7 @@ limitations under the License.
 package tasks
 
 import (
+	"github.com/apache/incubator-devlake/plugins/core/dal"
 	"reflect"
 
 	"github.com/apache/incubator-devlake/models/domainlayer"
@@ -38,15 +39,21 @@ var ConvertMergeRequestCommentMeta = core.SubTaskMeta{
 func ConvertMergeRequestComment(taskCtx core.SubTaskContext) error {
 
 	rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_PROJECT_TABLE)
-	db := taskCtx.GetDb()
+	db := taskCtx.GetDal()
+	clauses := []dal.Clause{
+		dal.From(&models.GitlabMergeRequestComment{}),
+		dal.Join(`left join _tool_gitlab_merge_requests on 
+			_tool_gitlab_merge_requests.gitlab_id = 
+			_tool_gitlab_merge_request_comments.merge_request_id`),
+		dal.Where("_tool_gitlab_merge_requests.project_id = ?", data.Options.ProjectId),
+	}
 
-	cursor, err := db.Model(&models.GitlabMergeRequestComment{}).
-		Joins("left join _tool_gitlab_merge_requests on _tool_gitlab_merge_requests.gitlab_id = _tool_gitlab_merge_request_comments.merge_request_id").
-		Where("_tool_gitlab_merge_requests.project_id = ?", data.Options.ProjectId).Rows()
+	cursor, err := db.Cursor(clauses...)
 	if err != nil {
 		return err
 	}
 	defer cursor.Close()
+
 	domainIdGeneratorComment := didgen.NewDomainIdGenerator(&models.GitlabMergeRequestComment{})
 	prIdGen := didgen.NewDomainIdGenerator(&models.GitlabMergeRequest{})
 	userIdGen := didgen.NewDomainIdGenerator(&models.GitlabUser{})
diff --git a/plugins/gitlab/tasks/mr_commit_convertor.go b/plugins/gitlab/tasks/mr_commit_convertor.go
index 86c8f2c4..b2b9af90 100644
--- a/plugins/gitlab/tasks/mr_commit_convertor.go
+++ b/plugins/gitlab/tasks/mr_commit_convertor.go
@@ -18,6 +18,7 @@ limitations under the License.
 package tasks
 
 import (
+	"github.com/apache/incubator-devlake/plugins/core/dal"
 	"reflect"
 
 	"github.com/apache/incubator-devlake/models/domainlayer/code"
@@ -36,12 +37,18 @@ var ConvertApiMergeRequestsCommitsMeta = core.SubTaskMeta{
 
 func ConvertApiMergeRequestsCommits(taskCtx core.SubTaskContext) error {
 	rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_MERGE_REQUEST_COMMITS_TABLE)
-	db := taskCtx.GetDb()
+	db := taskCtx.GetDal()
 
-	cursor, err := db.Model(&models.GitlabMergeRequestCommit{}).
-		Joins(`left join _tool_gitlab_merge_requests on _tool_gitlab_merge_requests.gitlab_id = _tool_gitlab_merge_request_commits.merge_request_id`).
-		Where("_tool_gitlab_merge_requests.project_id = ?", data.Options.ProjectId).
-		Order("merge_request_id ASC").Rows()
+	clauses := []dal.Clause{
+		dal.From(&models.GitlabMergeRequestCommit{}),
+		dal.Join(`left join _tool_gitlab_merge_requests 
+			on _tool_gitlab_merge_requests.gitlab_id = 
+			_tool_gitlab_merge_request_commits.merge_request_id`),
+		dal.Where("_tool_gitlab_merge_requests.project_id = ?", data.Options.ProjectId),
+		dal.Orderby("merge_request_id ASC"),
+	}
+
+	cursor, err := db.Cursor(clauses...)
 	if err != nil {
 		return err
 	}
diff --git a/plugins/gitlab/tasks/mr_convertor.go b/plugins/gitlab/tasks/mr_convertor.go
index c0e76e6b..58a0845d 100644
--- a/plugins/gitlab/tasks/mr_convertor.go
+++ b/plugins/gitlab/tasks/mr_convertor.go
@@ -18,6 +18,7 @@ limitations under the License.
 package tasks
 
 import (
+	"github.com/apache/incubator-devlake/plugins/core/dal"
 	"reflect"
 
 	"github.com/apache/incubator-devlake/models/domainlayer"
@@ -37,17 +38,21 @@ var ConvertApiMergeRequestsMeta = core.SubTaskMeta{
 
 func ConvertApiMergeRequests(taskCtx core.SubTaskContext) error {
 	rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_MERGE_REQUEST_TABLE)
-	db := taskCtx.GetDb()
+	db := taskCtx.GetDal()
+	clauses := []dal.Clause{
+		dal.From(&models.GitlabMergeRequest{}),
+		dal.Where("project_id=?", data.Options.ProjectId),
+	}
 
-	domainMrIdGenerator := didgen.NewDomainIdGenerator(&models.GitlabMergeRequest{})
-	domainRepoIdGenerator := didgen.NewDomainIdGenerator(&models.GitlabProject{})
-	//Find all piplines associated with the current projectid
-	cursor, err := db.Model(&models.GitlabMergeRequest{}).Where("project_id=?", data.Options.ProjectId).Rows()
+	cursor, err := db.Cursor(clauses...)
 	if err != nil {
 		return err
 	}
 	defer cursor.Close()
 
+	domainMrIdGenerator := didgen.NewDomainIdGenerator(&models.GitlabMergeRequest{})
+	domainRepoIdGenerator := didgen.NewDomainIdGenerator(&models.GitlabProject{})
+
 	converter, err := helper.NewDataConverter(helper.DataConverterArgs{
 		RawDataSubTaskArgs: *rawDataSubTaskArgs,
 		InputRowType:       reflect.TypeOf(models.GitlabMergeRequest{}),
diff --git a/plugins/gitlab/tasks/mr_enricher.go b/plugins/gitlab/tasks/mr_enricher.go
index d769f2e5..f1f5e555 100644
--- a/plugins/gitlab/tasks/mr_enricher.go
+++ b/plugins/gitlab/tasks/mr_enricher.go
@@ -18,11 +18,13 @@ limitations under the License.
 package tasks
 
 import (
+	"github.com/apache/incubator-devlake/plugins/core/dal"
+	"github.com/apache/incubator-devlake/plugins/helper"
+	"reflect"
 	"time"
 
 	"github.com/apache/incubator-devlake/plugins/core"
-	gitlabModels "github.com/apache/incubator-devlake/plugins/gitlab/models"
-	"gorm.io/gorm/clause"
+	"github.com/apache/incubator-devlake/plugins/gitlab/models"
 )
 
 var EnrichMergeRequestsMeta = core.SubTaskMeta{
@@ -33,55 +35,80 @@ var EnrichMergeRequestsMeta = core.SubTaskMeta{
 }
 
 func EnrichMergeRequests(taskCtx core.SubTaskContext) error {
-	data := taskCtx.GetData().(*GitlabTaskData)
-	db := taskCtx.GetDb()
-	// get mrs from theDB
-	cursor, err := db.Model(&gitlabModels.GitlabMergeRequest{}).Where("project_id = ?", data.Options.ProjectId).Rows()
+	rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_MERGE_REQUEST_TABLE)
+
+	db := taskCtx.GetDal()
+	clauses := []dal.Clause{
+		dal.From(&models.GitlabMergeRequest{}),
+		dal.Where("project_id=?", data.Options.ProjectId),
+	}
+
+	cursor, err := db.Cursor(clauses...)
 	if err != nil {
 		return err
-	}
+	} // get mrs from theDB
 	defer cursor.Close()
 
-	gitlabMr := &gitlabModels.GitlabMergeRequest{}
-	for cursor.Next() {
-		err = db.ScanRows(cursor, gitlabMr)
-		if err != nil {
-			return err
-		}
-		// enrich first_comment_time field
-		notes := make([]gitlabModels.GitlabMergeRequestNote, 0)
-		// `system` = 0 is needed since we only care about human comments
-		db.Where("merge_request_id = ? AND is_system = ?", gitlabMr.GitlabId, false).
-			Order("gitlab_created_at asc").Find(&notes)
-		commits := make([]gitlabModels.GitlabCommit, 0)
-		db.Joins("join _tool_gitlab_merge_request_commits gmrc on gmrc.commit_sha = _tool_gitlab_commits.sha").
-			Where("merge_request_id = ?", gitlabMr.GitlabId).Order("authored_date asc").Find(&commits)
-		// calculate reviewRounds from commits and notes
-		reviewRounds := getReviewRounds(commits, notes)
-		gitlabMr.ReviewRounds = reviewRounds
-
-		if len(notes) > 0 {
-			earliestNote, err := findEarliestNote(notes)
+	converter, err := helper.NewDataConverter(helper.DataConverterArgs{
+		RawDataSubTaskArgs: *rawDataSubTaskArgs,
+		InputRowType:       reflect.TypeOf(models.GitlabMergeRequest{}),
+		Input:              cursor,
+
+		Convert: func(inputRow interface{}) ([]interface{}, error) {
+			gitlabMr := inputRow.(*models.GitlabMergeRequest)
+			// enrich first_comment_time field
+			notes := make([]models.GitlabMergeRequestNote, 0)
+			// `system` = 0 is needed since we only care about human comments
+			noteClauses := []dal.Clause{
+				dal.From(&models.GitlabMergeRequestNote{}),
+				dal.Where("merge_request_id = ? AND is_system = ?", gitlabMr.GitlabId, false),
+				dal.Orderby("gitlab_created_at asc"),
+			}
+			err = db.All(&notes, noteClauses...)
 			if err != nil {
-				return err
+				return nil, err
 			}
-			if earliestNote != nil {
-				gitlabMr.FirstCommentTime = &earliestNote.GitlabCreatedAt
+
+			commits := make([]models.GitlabCommit, 0)
+			commitClauses := []dal.Clause{
+				dal.From(&models.GitlabCommit{}),
+				dal.Join(`join _tool_gitlab_merge_request_commits gmrc 
+					on gmrc.commit_sha = _tool_gitlab_commits.sha`),
+				dal.Where("merge_request_id = ?", gitlabMr.GitlabId),
+				dal.Orderby("authored_date asc"),
+			}
+			err = db.All(&commits, commitClauses...)
+			if err != nil {
+				return nil, err
 			}
-		}
 
-		err = db.Clauses(clause.OnConflict{
-			UpdateAll: true,
-		}).Create(gitlabMr).Error
-		if err != nil {
-			return err
-		}
+			// calculate reviewRounds from commits and notes
+			reviewRounds := getReviewRounds(commits, notes)
+			gitlabMr.ReviewRounds = reviewRounds
+
+			if len(notes) > 0 {
+				earliestNote, err := findEarliestNote(notes)
+				if err != nil {
+					return nil, err
+				}
+				if earliestNote != nil {
+					gitlabMr.FirstCommentTime = &earliestNote.GitlabCreatedAt
+				}
+			}
+			return []interface{}{
+				gitlabMr,
+			}, nil
+		},
+	})
+	if err != nil {
+		return err
 	}
-	return nil
+
+	return converter.Execute()
 }
 
-func findEarliestNote(notes []gitlabModels.GitlabMergeRequestNote) (*gitlabModels.GitlabMergeRequestNote, error) {
-	var earliestNote *gitlabModels.GitlabMergeRequestNote
+func findEarliestNote(notes []models.GitlabMergeRequestNote) (*models.GitlabMergeRequestNote, error) {
+	var earliestNote *models.GitlabMergeRequestNote
 	earliestTime := time.Now()
 	for i := range notes {
 		if !notes[i].Resolvable {
@@ -96,7 +123,7 @@ func findEarliestNote(notes []gitlabModels.GitlabMergeRequestNote) (*gitlabModel
 	return earliestNote, nil
 }
 
-func getReviewRounds(commits []gitlabModels.GitlabCommit, notes []gitlabModels.GitlabMergeRequestNote) int {
+func getReviewRounds(commits []models.GitlabCommit, notes []models.GitlabMergeRequestNote) int {
 	i := 0
 	j := 0
 	reviewRounds := 0
diff --git a/plugins/gitlab/tasks/note_convertor.go b/plugins/gitlab/tasks/note_convertor.go
index bd062a8c..399e36a3 100644
--- a/plugins/gitlab/tasks/note_convertor.go
+++ b/plugins/gitlab/tasks/note_convertor.go
@@ -18,6 +18,7 @@ limitations under the License.
 package tasks
 
 import (
+	"github.com/apache/incubator-devlake/plugins/core/dal"
 	"reflect"
 
 	"github.com/apache/incubator-devlake/models/domainlayer"
@@ -38,15 +39,21 @@ var ConvertApiNotesMeta = core.SubTaskMeta{
 func ConvertApiNotes(taskCtx core.SubTaskContext) error {
 
 	rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_PROJECT_TABLE)
-	db := taskCtx.GetDb()
+	db := taskCtx.GetDal()
+	clauses := []dal.Clause{
+		dal.From(&models.GitlabMergeRequestNote{}),
+		dal.Join(`left join _tool_gitlab_merge_requests 
+			on _tool_gitlab_merge_requests.gitlab_id = 
+			_tool_gitlab_merge_request_notes.merge_request_id`),
+		dal.Where("_tool_gitlab_merge_requests.project_id = ?", data.Options.ProjectId),
+	}
 
-	cursor, err := db.Model(&models.GitlabMergeRequestNote{}).
-		Joins("left join _tool_gitlab_merge_requests on _tool_gitlab_merge_requests.gitlab_id = _tool_gitlab_merge_request_notes.merge_request_id").
-		Where("_tool_gitlab_merge_requests.project_id = ?", data.Options.ProjectId).Rows()
+	cursor, err := db.Cursor(clauses...)
 	if err != nil {
 		return err
 	}
 	defer cursor.Close()
+
 	domainIdGeneratorNote := didgen.NewDomainIdGenerator(&models.GitlabMergeRequestNote{})
 	prIdGen := didgen.NewDomainIdGenerator(&models.GitlabMergeRequest{})
 	userIdGen := didgen.NewDomainIdGenerator(&models.GitlabUser{})
diff --git a/plugins/gitlab/tasks/project_convertor.go b/plugins/gitlab/tasks/project_convertor.go
index 7a1a35c9..f191ea3c 100644
--- a/plugins/gitlab/tasks/project_convertor.go
+++ b/plugins/gitlab/tasks/project_convertor.go
@@ -18,6 +18,7 @@ limitations under the License.
 package tasks
 
 import (
+	"github.com/apache/incubator-devlake/plugins/core/dal"
 	"reflect"
 
 	"github.com/apache/incubator-devlake/models/domainlayer"
@@ -38,14 +39,16 @@ var ConvertProjectMeta = core.SubTaskMeta{
 func ConvertApiProjects(taskCtx core.SubTaskContext) error {
 
 	rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_PROJECT_TABLE)
-	db := taskCtx.GetDb()
+	db := taskCtx.GetDal()
+	clauses := []dal.Clause{
+		dal.From(&models.GitlabProject{}),
+		dal.Where("gitlab_id=?", data.Options.ProjectId),
+	}
 
-	//Find all piplines associated with the current projectid
-	cursor, err := db.Model(&models.GitlabProject{}).Where("gitlab_id=?", data.Options.ProjectId).Rows()
+	cursor, err := db.Cursor(clauses...)
 	if err != nil {
 		return err
 	}
-	defer cursor.Close()
 
 	converter, err := helper.NewDataConverter(helper.DataConverterArgs{
 		RawDataSubTaskArgs: *rawDataSubTaskArgs,
diff --git a/plugins/gitlab/tasks/shared.go b/plugins/gitlab/tasks/shared.go
index 3c5ffb1a..f29b5260 100644
--- a/plugins/gitlab/tasks/shared.go
+++ b/plugins/gitlab/tasks/shared.go
@@ -20,6 +20,7 @@ package tasks
 import (
 	"encoding/json"
 	"fmt"
+	"github.com/apache/incubator-devlake/plugins/core/dal"
 	"io/ioutil"
 	"net/http"
 	"net/url"
@@ -27,7 +28,6 @@ import (
 	"strconv"
 
 	"github.com/apache/incubator-devlake/plugins/core"
-	"github.com/apache/incubator-devlake/plugins/gitlab/models"
 	"github.com/apache/incubator-devlake/plugins/helper"
 )
 
@@ -93,25 +93,23 @@ func CreateRawDataSubTaskArgs(taskCtx core.SubTaskContext, Table string) (*helpe
 	return RawDataSubTaskArgs, data
 }
 
-func GetMergeRequestsIterator(taskCtx core.SubTaskContext) (*helper.CursorIterator, error) {
-	db := taskCtx.GetDb()
+func GetMergeRequestsIterator(taskCtx core.SubTaskContext) (*helper.DalCursorIterator, error) {
+	db := taskCtx.GetDal()
 	data := taskCtx.GetData().(*GitlabTaskData)
-	cursor, err := db.Model(&models.GitlabMergeRequest{}).Select("gitlab_id, iid").
-		Where("project_id = ?", data.Options.ProjectId).Select("gitlab_id,iid").Rows()
-	if err != nil {
-		return nil, err
+	clauses := []dal.Clause{
+		dal.Select("gmr.gitlab_id, gmr.iid"),
+		dal.From("_tool_gitlab_merge_requests gmr"),
+		dal.Where(
+			`gmr.project_id = ?`,
+			data.Options.ProjectId,
+		),
 	}
-
-	return helper.NewCursorIterator(db, cursor, reflect.TypeOf(GitlabInput{}))
-}
-
-func GetPipelinesIterator(taskCtx core.SubTaskContext) (*helper.CursorIterator, error) {
-	db := taskCtx.GetDb()
-	data := taskCtx.GetData().(*GitlabTaskData)
-	cursor, err := db.Model(&models.GitlabPipeline{}).Where("project_id = ?", data.Options.ProjectId).Select("gitlab_id").Rows()
+	// construct the input iterator
+	cursor, err := db.Cursor(clauses...)
 	if err != nil {
 		return nil, err
 	}
+	defer cursor.Close()
 
-	return helper.NewCursorIterator(db, cursor, reflect.TypeOf(GitlabInput{}))
+	return helper.NewDalCursorIterator(db, cursor, reflect.TypeOf(GitlabInput{}))
 }