You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by wa...@apache.org on 2022/09/13 08:35:12 UTC

[incubator-devlake] branch main updated: fix(gitlab): modify migraion script

This is an automated email from the ASF dual-hosted git repository.

warren pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 94c7dd82 fix(gitlab): modify migraion script
94c7dd82 is described below

commit 94c7dd8203795b3f0a7990b4f375b3f2a5819eba
Author: Yingchu Chen <yi...@merico.dev>
AuthorDate: Fri Sep 9 19:16:07 2022 +0800

    fix(gitlab): modify migraion script
    
    relate to #2993
---
 impl/dalgorm/dalgorm.go                            |  5 ++
 plugins/core/dal/dal.go                            |  2 +
 plugins/gitlab/api/blueprint.go                    |  2 +-
 plugins/gitlab/api/connection.go                   |  2 +-
 plugins/gitlab/api/init.go                         |  6 +-
 plugins/gitlab/api/proxy.go                        |  2 +-
 .../20220906_fix_duration_to_float8.go             | 17 +++--
 plugins/helper/batch_save.go                       | 72 ++++++++++++++++++++--
 8 files changed, 90 insertions(+), 18 deletions(-)

diff --git a/impl/dalgorm/dalgorm.go b/impl/dalgorm/dalgorm.go
index 2b41cfe6..59ac6472 100644
--- a/impl/dalgorm/dalgorm.go
+++ b/impl/dalgorm/dalgorm.go
@@ -146,6 +146,11 @@ func (d *Dalgorm) Delete(entity interface{}, clauses ...dal.Clause) error {
 	return buildTx(d.db, clauses).Delete(entity).Error
 }
 
+// UpdateColumns batch records in database
+func (d *Dalgorm) UpdateColumns(entity interface{}, clauses ...dal.Clause) error {
+	return buildTx(d.db, clauses).UpdateColumns(entity).Error
+}
+
 // GetColumns FIXME ...
 func (d *Dalgorm) GetColumns(dst schema.Tabler, filter func(columnMeta dal.ColumnMeta) bool) (cms []dal.ColumnMeta, err error) {
 	columnTypes, err := d.db.Migrator().ColumnTypes(dst.TableName())
diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go
index 810609c9..2097dc10 100644
--- a/plugins/core/dal/dal.go
+++ b/plugins/core/dal/dal.go
@@ -73,6 +73,8 @@ type Dal interface {
 	Create(entity interface{}, clauses ...Clause) error
 	// Update updates record
 	Update(entity interface{}, clauses ...Clause) error
+	// UpdateColumns batch records in database
+	UpdateColumns(entity interface{}, clauses ...Clause) error
 	// CreateOrUpdate tries to create the record, or fallback to update all if failed
 	CreateOrUpdate(entity interface{}, clauses ...Clause) error
 	// CreateIfNotExist tries to create the record if not exist
diff --git a/plugins/gitlab/api/blueprint.go b/plugins/gitlab/api/blueprint.go
index e0019610..9c35cab2 100644
--- a/plugins/gitlab/api/blueprint.go
+++ b/plugins/gitlab/api/blueprint.go
@@ -108,7 +108,7 @@ func MakePipelinePlan(subtaskMetas []core.SubTaskMeta, connectionId uint64, scop
 				},
 				10*time.Second,
 				connection.Proxy,
-				basicRes,
+				BasicRes,
 			)
 			if err != nil {
 				return nil, err
diff --git a/plugins/gitlab/api/connection.go b/plugins/gitlab/api/connection.go
index 957fe682..5f953cd1 100644
--- a/plugins/gitlab/api/connection.go
+++ b/plugins/gitlab/api/connection.go
@@ -60,7 +60,7 @@ func TestConnection(input *core.ApiResourceInput) (*core.ApiResourceOutput, erro
 		},
 		3*time.Second,
 		connection.Proxy,
-		basicRes,
+		BasicRes,
 	)
 	if err != nil {
 		return nil, err
diff --git a/plugins/gitlab/api/init.go b/plugins/gitlab/api/init.go
index 6774e148..f1fb68f9 100644
--- a/plugins/gitlab/api/init.go
+++ b/plugins/gitlab/api/init.go
@@ -27,13 +27,13 @@ import (
 
 var vld *validator.Validate
 var connectionHelper *helper.ConnectionApiHelper
-var basicRes core.BasicRes
+var BasicRes core.BasicRes
 
 func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
-	basicRes = helper.NewDefaultBasicRes(config, logger, database)
+	BasicRes = helper.NewDefaultBasicRes(config, logger, database)
 	vld = validator.New()
 	connectionHelper = helper.NewConnectionHelper(
-		basicRes,
+		BasicRes,
 		vld,
 	)
 }
diff --git a/plugins/gitlab/api/proxy.go b/plugins/gitlab/api/proxy.go
index ae56772f..70f9b23c 100644
--- a/plugins/gitlab/api/proxy.go
+++ b/plugins/gitlab/api/proxy.go
@@ -47,7 +47,7 @@ func Proxy(input *core.ApiResourceInput) (*core.ApiResourceOutput, error) {
 		},
 		30*time.Second,
 		connection.Proxy,
-		basicRes,
+		BasicRes,
 	)
 	if err != nil {
 		return nil, err
diff --git a/plugins/gitlab/models/migrationscripts/20220906_fix_duration_to_float8.go b/plugins/gitlab/models/migrationscripts/20220906_fix_duration_to_float8.go
index 903b0073..e2009c78 100644
--- a/plugins/gitlab/models/migrationscripts/20220906_fix_duration_to_float8.go
+++ b/plugins/gitlab/models/migrationscripts/20220906_fix_duration_to_float8.go
@@ -19,7 +19,11 @@ package migrationscripts
 
 import (
 	"context"
+	"github.com/apache/incubator-devlake/errors"
+	"github.com/apache/incubator-devlake/plugins/gitlab/api"
+	"github.com/apache/incubator-devlake/plugins/helper"
 	"gorm.io/gorm"
+	"reflect"
 )
 
 type fixDurationToFloat8 struct{}
@@ -41,22 +45,23 @@ func (*fixDurationToFloat8) Up(ctx context.Context, db *gorm.DB) error {
 	if err != nil {
 		return err
 	}
-
 	cursor, err := db.Model(&GitlabJob20220906{}).Select([]string{"connection_id", "gitlab_id", "duration"}).Rows()
 	if err != nil {
 		return err
 	}
+	batch, err := helper.NewBatchUpdate(api.BasicRes, reflect.TypeOf(&GitlabJob20220906{}), 500)
+	if err != nil {
+		return errors.Default.Wrap(err, "error getting batch from table", errors.UserMessage("Internal Converter execution error"))
+	}
+	defer batch.Close()
 	for cursor.Next() {
 		job := GitlabJob20220906{}
 		err = db.ScanRows(cursor, &job)
 		if err != nil {
 			return err
 		}
-		err = db.
-			Model(job).
-			Where(`connection_id=? AND gitlab_id=?`, job.ConnectionId, job.GitlabId).
-			Update(`duration2`, job.Duration).
-			Error
+		job.Duration2 = job.Duration
+		err = batch.Add(&job)
 		if err != nil {
 			return err
 		}
diff --git a/plugins/helper/batch_save.go b/plugins/helper/batch_save.go
index c76b00ac..d6990a3f 100644
--- a/plugins/helper/batch_save.go
+++ b/plugins/helper/batch_save.go
@@ -42,6 +42,11 @@ type BatchSave struct {
 	primaryKey []reflect.StructField
 }
 
+// BatchUpdate will update records by batch
+type BatchUpdate struct {
+	*BatchSave
+}
+
 // NewBatchSave creates a new BatchSave instance
 func NewBatchSave(basicRes core.BasicRes, slotType reflect.Type, size int) (*BatchSave, error) {
 	if slotType.Kind() != reflect.Ptr {
@@ -67,8 +72,49 @@ func NewBatchSave(basicRes core.BasicRes, slotType reflect.Type, size int) (*Bat
 	}, nil
 }
 
+// NewBatchUpdate creates a new BatchUpdate instance
+func NewBatchUpdate(basicRes core.BasicRes, slotType reflect.Type, size int) (*BatchUpdate, error) {
+	batchSave, err := NewBatchSave(basicRes, slotType, size)
+	if err != nil {
+		return nil, err
+	}
+	batchUpdate := BatchUpdate{
+		BatchSave: batchSave,
+	}
+	return &batchUpdate, nil
+}
+
 // Add record to cache. BatchSave would flush them into Database when cache is max out
 func (c *BatchSave) Add(slot interface{}) error {
+	err := c.prepareForFlush(slot)
+	if err != nil {
+		return err
+	}
+	// flush out into database if max outed
+	if c.current == c.size {
+		return c.Flush()
+	} else if c.current%100 == 0 {
+		c.log.Debug("batch save current: %d", c.current)
+	}
+	return nil
+}
+
+// Update record to cache. BatchSave would flush them into Database when cache is max out
+func (c *BatchUpdate) Update(slot interface{}) error {
+	err := c.prepareForFlush(slot)
+	if err != nil {
+		return err
+	}
+	// flush out into database if max outed
+	if c.current == c.size {
+		return c.FlushUpdate()
+	} else if c.current%100 == 0 {
+		c.log.Debug("batch save current: %d", c.current)
+	}
+	return nil
+}
+
+func (c *BatchSave) prepareForFlush(slot interface{}) error {
 	// type checking
 	if reflect.TypeOf(slot) != c.slotType {
 		return errors.Default.New("sub cache type mismatched")
@@ -89,12 +135,6 @@ func (c *BatchSave) Add(slot interface{}) error {
 	}
 	c.slots.Index(c.current).Set(reflect.ValueOf(slot))
 	c.current++
-	// flush out into database if max outed
-	if c.current == c.size {
-		return c.Flush()
-	} else if c.current%100 == 0 {
-		c.log.Debug("batch save current: %d", c.current)
-	}
 	return nil
 }
 
@@ -110,6 +150,18 @@ func (c *BatchSave) Flush() error {
 	return nil
 }
 
+// FlushUpdate update cached records into database
+func (c *BatchUpdate) FlushUpdate() error {
+	err := c.db.UpdateColumns(c.slots.Slice(0, c.current).Interface())
+	if err != nil {
+		return err
+	}
+	c.log.Debug("batch save flush total %d records to database", c.current)
+	c.current = 0
+	c.valueIndex = make(map[string]int)
+	return nil
+}
+
 // Close would flash the cache and release resources
 func (c *BatchSave) Close() error {
 	if c.current > 0 {
@@ -118,6 +170,14 @@ func (c *BatchSave) Close() error {
 	return nil
 }
 
+// Close would flash the cache and release resources
+func (c *BatchUpdate) Close() error {
+	if c.current > 0 {
+		return c.FlushUpdate()
+	}
+	return nil
+}
+
 func getKeyValue(iface interface{}, primaryKey []reflect.StructField) string {
 	var ss []string
 	ifv := reflect.ValueOf(iface)