You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by wa...@apache.org on 2022/09/13 08:35:12 UTC
[incubator-devlake] branch main updated: fix(gitlab): modify migraion script
This is an automated email from the ASF dual-hosted git repository.
warren pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 94c7dd82 fix(gitlab): modify migraion script
94c7dd82 is described below
commit 94c7dd8203795b3f0a7990b4f375b3f2a5819eba
Author: Yingchu Chen <yi...@merico.dev>
AuthorDate: Fri Sep 9 19:16:07 2022 +0800
fix(gitlab): modify migraion script
relate to #2993
---
impl/dalgorm/dalgorm.go | 5 ++
plugins/core/dal/dal.go | 2 +
plugins/gitlab/api/blueprint.go | 2 +-
plugins/gitlab/api/connection.go | 2 +-
plugins/gitlab/api/init.go | 6 +-
plugins/gitlab/api/proxy.go | 2 +-
.../20220906_fix_duration_to_float8.go | 17 +++--
plugins/helper/batch_save.go | 72 ++++++++++++++++++++--
8 files changed, 90 insertions(+), 18 deletions(-)
diff --git a/impl/dalgorm/dalgorm.go b/impl/dalgorm/dalgorm.go
index 2b41cfe6..59ac6472 100644
--- a/impl/dalgorm/dalgorm.go
+++ b/impl/dalgorm/dalgorm.go
@@ -146,6 +146,11 @@ func (d *Dalgorm) Delete(entity interface{}, clauses ...dal.Clause) error {
return buildTx(d.db, clauses).Delete(entity).Error
}
+// UpdateColumns batch records in database
+func (d *Dalgorm) UpdateColumns(entity interface{}, clauses ...dal.Clause) error {
+ return buildTx(d.db, clauses).UpdateColumns(entity).Error
+}
+
// GetColumns FIXME ...
func (d *Dalgorm) GetColumns(dst schema.Tabler, filter func(columnMeta dal.ColumnMeta) bool) (cms []dal.ColumnMeta, err error) {
columnTypes, err := d.db.Migrator().ColumnTypes(dst.TableName())
diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go
index 810609c9..2097dc10 100644
--- a/plugins/core/dal/dal.go
+++ b/plugins/core/dal/dal.go
@@ -73,6 +73,8 @@ type Dal interface {
Create(entity interface{}, clauses ...Clause) error
// Update updates record
Update(entity interface{}, clauses ...Clause) error
+ // UpdateColumns batch records in database
+ UpdateColumns(entity interface{}, clauses ...Clause) error
// CreateOrUpdate tries to create the record, or fallback to update all if failed
CreateOrUpdate(entity interface{}, clauses ...Clause) error
// CreateIfNotExist tries to create the record if not exist
diff --git a/plugins/gitlab/api/blueprint.go b/plugins/gitlab/api/blueprint.go
index e0019610..9c35cab2 100644
--- a/plugins/gitlab/api/blueprint.go
+++ b/plugins/gitlab/api/blueprint.go
@@ -108,7 +108,7 @@ func MakePipelinePlan(subtaskMetas []core.SubTaskMeta, connectionId uint64, scop
},
10*time.Second,
connection.Proxy,
- basicRes,
+ BasicRes,
)
if err != nil {
return nil, err
diff --git a/plugins/gitlab/api/connection.go b/plugins/gitlab/api/connection.go
index 957fe682..5f953cd1 100644
--- a/plugins/gitlab/api/connection.go
+++ b/plugins/gitlab/api/connection.go
@@ -60,7 +60,7 @@ func TestConnection(input *core.ApiResourceInput) (*core.ApiResourceOutput, erro
},
3*time.Second,
connection.Proxy,
- basicRes,
+ BasicRes,
)
if err != nil {
return nil, err
diff --git a/plugins/gitlab/api/init.go b/plugins/gitlab/api/init.go
index 6774e148..f1fb68f9 100644
--- a/plugins/gitlab/api/init.go
+++ b/plugins/gitlab/api/init.go
@@ -27,13 +27,13 @@ import (
var vld *validator.Validate
var connectionHelper *helper.ConnectionApiHelper
-var basicRes core.BasicRes
+var BasicRes core.BasicRes
func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
- basicRes = helper.NewDefaultBasicRes(config, logger, database)
+ BasicRes = helper.NewDefaultBasicRes(config, logger, database)
vld = validator.New()
connectionHelper = helper.NewConnectionHelper(
- basicRes,
+ BasicRes,
vld,
)
}
diff --git a/plugins/gitlab/api/proxy.go b/plugins/gitlab/api/proxy.go
index ae56772f..70f9b23c 100644
--- a/plugins/gitlab/api/proxy.go
+++ b/plugins/gitlab/api/proxy.go
@@ -47,7 +47,7 @@ func Proxy(input *core.ApiResourceInput) (*core.ApiResourceOutput, error) {
},
30*time.Second,
connection.Proxy,
- basicRes,
+ BasicRes,
)
if err != nil {
return nil, err
diff --git a/plugins/gitlab/models/migrationscripts/20220906_fix_duration_to_float8.go b/plugins/gitlab/models/migrationscripts/20220906_fix_duration_to_float8.go
index 903b0073..e2009c78 100644
--- a/plugins/gitlab/models/migrationscripts/20220906_fix_duration_to_float8.go
+++ b/plugins/gitlab/models/migrationscripts/20220906_fix_duration_to_float8.go
@@ -19,7 +19,11 @@ package migrationscripts
import (
"context"
+ "github.com/apache/incubator-devlake/errors"
+ "github.com/apache/incubator-devlake/plugins/gitlab/api"
+ "github.com/apache/incubator-devlake/plugins/helper"
"gorm.io/gorm"
+ "reflect"
)
type fixDurationToFloat8 struct{}
@@ -41,22 +45,23 @@ func (*fixDurationToFloat8) Up(ctx context.Context, db *gorm.DB) error {
if err != nil {
return err
}
-
cursor, err := db.Model(&GitlabJob20220906{}).Select([]string{"connection_id", "gitlab_id", "duration"}).Rows()
if err != nil {
return err
}
+ batch, err := helper.NewBatchUpdate(api.BasicRes, reflect.TypeOf(&GitlabJob20220906{}), 500)
+ if err != nil {
+ return errors.Default.Wrap(err, "error getting batch from table", errors.UserMessage("Internal Converter execution error"))
+ }
+ defer batch.Close()
for cursor.Next() {
job := GitlabJob20220906{}
err = db.ScanRows(cursor, &job)
if err != nil {
return err
}
- err = db.
- Model(job).
- Where(`connection_id=? AND gitlab_id=?`, job.ConnectionId, job.GitlabId).
- Update(`duration2`, job.Duration).
- Error
+ job.Duration2 = job.Duration
+ err = batch.Add(&job)
if err != nil {
return err
}
diff --git a/plugins/helper/batch_save.go b/plugins/helper/batch_save.go
index c76b00ac..d6990a3f 100644
--- a/plugins/helper/batch_save.go
+++ b/plugins/helper/batch_save.go
@@ -42,6 +42,11 @@ type BatchSave struct {
primaryKey []reflect.StructField
}
+// BatchUpdate will update records by batch
+type BatchUpdate struct {
+ *BatchSave
+}
+
// NewBatchSave creates a new BatchSave instance
func NewBatchSave(basicRes core.BasicRes, slotType reflect.Type, size int) (*BatchSave, error) {
if slotType.Kind() != reflect.Ptr {
@@ -67,8 +72,49 @@ func NewBatchSave(basicRes core.BasicRes, slotType reflect.Type, size int) (*Bat
}, nil
}
+// NewBatchUpdate creates a new BatchUpdate instance
+func NewBatchUpdate(basicRes core.BasicRes, slotType reflect.Type, size int) (*BatchUpdate, error) {
+ batchSave, err := NewBatchSave(basicRes, slotType, size)
+ if err != nil {
+ return nil, err
+ }
+ batchUpdate := BatchUpdate{
+ BatchSave: batchSave,
+ }
+ return &batchUpdate, nil
+}
+
// Add record to cache. BatchSave would flush them into Database when cache is max out
func (c *BatchSave) Add(slot interface{}) error {
+ err := c.prepareForFlush(slot)
+ if err != nil {
+ return err
+ }
+ // flush out into database if max outed
+ if c.current == c.size {
+ return c.Flush()
+ } else if c.current%100 == 0 {
+ c.log.Debug("batch save current: %d", c.current)
+ }
+ return nil
+}
+
+// Update record to cache. BatchSave would flush them into Database when cache is max out
+func (c *BatchUpdate) Update(slot interface{}) error {
+ err := c.prepareForFlush(slot)
+ if err != nil {
+ return err
+ }
+ // flush out into database if max outed
+ if c.current == c.size {
+ return c.FlushUpdate()
+ } else if c.current%100 == 0 {
+ c.log.Debug("batch save current: %d", c.current)
+ }
+ return nil
+}
+
+func (c *BatchSave) prepareForFlush(slot interface{}) error {
// type checking
if reflect.TypeOf(slot) != c.slotType {
return errors.Default.New("sub cache type mismatched")
@@ -89,12 +135,6 @@ func (c *BatchSave) Add(slot interface{}) error {
}
c.slots.Index(c.current).Set(reflect.ValueOf(slot))
c.current++
- // flush out into database if max outed
- if c.current == c.size {
- return c.Flush()
- } else if c.current%100 == 0 {
- c.log.Debug("batch save current: %d", c.current)
- }
return nil
}
@@ -110,6 +150,18 @@ func (c *BatchSave) Flush() error {
return nil
}
+// FlushUpdate update cached records into database
+func (c *BatchUpdate) FlushUpdate() error {
+ err := c.db.UpdateColumns(c.slots.Slice(0, c.current).Interface())
+ if err != nil {
+ return err
+ }
+ c.log.Debug("batch save flush total %d records to database", c.current)
+ c.current = 0
+ c.valueIndex = make(map[string]int)
+ return nil
+}
+
// Close would flash the cache and release resources
func (c *BatchSave) Close() error {
if c.current > 0 {
@@ -118,6 +170,14 @@ func (c *BatchSave) Close() error {
return nil
}
+// Close would flash the cache and release resources
+func (c *BatchUpdate) Close() error {
+ if c.current > 0 {
+ return c.FlushUpdate()
+ }
+ return nil
+}
+
func getKeyValue(iface interface{}, primaryKey []reflect.StructField) string {
var ss []string
ifv := reflect.ValueOf(iface)